You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 03:41:23 UTC

[flink] branch release-1.9 updated (7838436 -> 3c74cc4)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7838436  [FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.
     new 85e0538  [FLINK-13529][table-planner-blink] Rename CONCAT_AGG to LISTAGG and fix the behavior according to the ANSI-SQL
     new 48d5ba8  [FLINK-13529][table-planner-blink] Remove the second parameter of FIRST_VALUE and LAST_VALUE
     new 3c74cc4  [FLINK-13529][table-planner-blink] Remove APPROX_COUNT_DISTINCT and INCR_SUM in blink planner

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...ConcatAggFunction.java => ListAggFunction.java} |  14 +--
 ...ion.java => ListAggWithRetractAggFunction.java} |  30 ++---
 ...n.java => ListAggWsWithRetractAggFunction.java} |  30 ++---
 .../functions/sql/FlinkSqlOperatorTable.java       |  10 +-
 .../sql/SqlFirstLastValueAggFunction.java          |  34 ++++--
 .../functions/sql/SqlIncrSumAggFunction.java       |  75 ------------
 ...catAggFunction.java => SqlListAggFunction.java} |  27 +++--
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |   2 -
 .../plan/rules/logical/SplitAggregateRule.scala    |   4 +-
 .../planner/plan/utils/AggFunctionFactory.scala    |  24 ++--
 .../table/planner/plan/utils/AggregateUtil.scala   |   4 +-
 ...java => ListAggWithRetractAggFunctionTest.java} |  16 +--
 ...va => ListAggWsWithRetractAggFunctionTest.java} |  36 +++---
 .../apache/flink/table/api/stream/ExplainTest.xml  |  48 ++++----
 .../planner/plan/batch/sql/RemoveCollationTest.xml |  16 +--
 .../plan/rules/logical/SplitAggregateRuleTest.xml  |   8 +-
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml |  40 +++----
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  |  34 +++---
 .../stream/sql/agg/IncrementalAggregateTest.xml    |  10 +-
 .../flink/table/api/stream/ExplainTest.scala       |   4 +-
 .../plan/batch/sql/RemoveCollationTest.scala       |   4 +-
 .../FlinkRelMdModifiedMonotonicityTest.scala       |  11 --
 .../rules/logical/SplitAggregateRuleTest.scala     |  16 +--
 .../stream/sql/MiniBatchIntervalInferTest.scala    |   6 +-
 .../table/planner/plan/stream/sql/RankTest.scala   |  75 ------------
 .../stream/sql/agg/DistinctAggregateTest.scala     |  17 +--
 .../table/validation/AggregateValidationTest.scala |  55 +++++++--
 .../runtime/batch/sql/agg/SortAggITCase.scala      |  11 +-
 .../sql/agg/SortDistinctAggregateITCase.scala      |  10 +-
 .../runtime/stream/sql/AggregateITCase.scala       | 127 ++-------------------
 .../runtime/stream/sql/OverWindowITCase.scala      |  37 +++---
 .../runtime/stream/sql/SplitAggregateITCase.scala  |  22 ----
 .../runtime/stream/table/AggregateITCase.scala     |   2 +-
 33 files changed, 289 insertions(+), 570 deletions(-)
 rename flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/{ConcatAggFunction.java => ListAggFunction.java} (91%)
 rename flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/{ConcatWithRetractAggFunction.java => ListAggWithRetractAggFunction.java} (77%)
 rename flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/{ConcatWsWithRetractAggFunction.java => ListAggWsWithRetractAggFunction.java} (77%)
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java
 rename flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/{SqlConcatAggFunction.java => SqlListAggFunction.java} (71%)
 rename flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/{ConcatWithRetractAggFunctionTest.java => ListAggWithRetractAggFunctionTest.java} (79%)
 rename flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/{ConcatWsWithRetractAggFunctionTest.java => ListAggWsWithRetractAggFunctionTest.java} (78%)


[flink] 03/03: [FLINK-13529][table-planner-blink] Remove APPROX_COUNT_DISTINCT and INCR_SUM in blink planner

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3c74cc4623f23afbf63f6cb4e1db162439702981
Author: beyond1920 <be...@126.com>
AuthorDate: Fri Aug 2 10:24:35 2019 +0800

    [FLINK-13529][table-planner-blink] Remove APPROX_COUNT_DISTINCT and INCR_SUM in blink planner
    
    - Remove APPROX_COUNT_DISTINCT for now because we still don't support it yet.
    - Remove INCR_SUM because it is not a standard aggregate function.
    
    This closes #9316
---
 .../functions/sql/FlinkSqlOperatorTable.java       |  6 --
 .../functions/sql/SqlIncrSumAggFunction.java       | 75 ----------------------
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |  2 -
 .../planner/plan/utils/AggFunctionFactory.scala    |  4 +-
 .../FlinkRelMdModifiedMonotonicityTest.scala       | 11 ----
 .../table/planner/plan/stream/sql/RankTest.scala   | 75 ----------------------
 .../sql/agg/SortDistinctAggregateITCase.scala      | 10 +--
 .../runtime/stream/sql/AggregateITCase.scala       | 36 -----------
 8 files changed, 2 insertions(+), 217 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index dbabb69..e201400 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -950,11 +950,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	 */
 	public static final SqlListAggFunction LISTAGG = new SqlListAggFunction();
 
-	/**
-	 * <code>INCR_SUM</code> aggregate function.
-	 */
-	public static final SqlIncrSumAggFunction INCR_SUM = new SqlIncrSumAggFunction();
-
 	// -----------------------------------------------------------------------------
 	// Window SQL functions
 	// -----------------------------------------------------------------------------
@@ -1072,7 +1067,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlAggFunction SUM = SqlStdOperatorTable.SUM;
 	public static final SqlAggFunction SUM0 = SqlStdOperatorTable.SUM0;
 	public static final SqlAggFunction COUNT = SqlStdOperatorTable.COUNT;
-	public static final SqlAggFunction APPROX_COUNT_DISTINCT = SqlStdOperatorTable.APPROX_COUNT_DISTINCT;
 	public static final SqlAggFunction COLLECT = SqlStdOperatorTable.COLLECT;
 	public static final SqlAggFunction MIN = SqlStdOperatorTable.MIN;
 	public static final SqlAggFunction MAX = SqlStdOperatorTable.MAX;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java
deleted file mode 100644
index 0b92186..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.functions.sql;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlSplittableAggFunction;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.List;
-
-/**
- * <code>INCR_SUM</code> is an aggregator which returns the sum of the values which
- * go into it like SUM. It differs in that the modified monotonicity of
- * INCR_SUM is INCREASING, while that of SUM should be inferred using
- * extra information.
- */
-public class SqlIncrSumAggFunction extends SqlAggFunction {
-
-	public SqlIncrSumAggFunction() {
-		super(
-				"INCR_SUM",
-				null,
-				SqlKind.SUM,
-				ReturnTypes.AGG_SUM,
-				null,
-				OperandTypes.NUMERIC,
-				SqlFunctionCategory.NUMERIC,
-				false,
-				false);
-	}
-
-	@Override
-	public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
-		return ImmutableList.of(
-				typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true));
-	}
-
-	@Override
-	public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-		return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true);
-	}
-
-	@Override
-	public <T> T unwrap(Class<T> clazz) {
-		if (clazz == SqlSplittableAggFunction.class) {
-			return clazz.cast(SqlSplittableAggFunction.CountSplitter.INSTANCE);
-		} else {
-			return super.unwrap(clazz);
-		}
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 57de91f..9c6c77a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.functions.sql.SqlIncrSumAggFunction
 import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
 import org.apache.flink.table.planner.plan.metadata.FlinkMetadata.ModifiedMonotonicity
@@ -337,7 +336,6 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
         case SqlKind.MIN => DECREASING
         case _ => NOT_MONOTONIC
       }
-      case _: SqlIncrSumAggFunction => INCREASING
       case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction =>
         val valueInterval = fmq.getFilteredColumnInterval(
           input, aggCall.getArgList.head, aggCall.filterArg)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index a8e00d3..e8707f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu
 import org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions._
-import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
+import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
 import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
@@ -75,8 +75,6 @@ class AggFunctionFactory(
 
       case _: SqlSumEmptyIsZeroAggFunction => createSum0AggFunction(argTypes)
 
-      case _: SqlIncrSumAggFunction => createIncrSumAggFunction(argTypes, index)
-
       case a: SqlMinMaxAggFunction if a.getKind == SqlKind.MIN =>
         createMinAggFunction(argTypes, index)
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
index cdd0191..3f79b07 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.plan.metadata
 
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
 import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType}
@@ -171,16 +170,6 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase {
       mq.getRelModifiedMonotonicity(aggWithAvg)
     )
 
-    // incr_sum agg
-    val aggWithIncrSum = relBuilder.scan("MyTable3").aggregate(
-      relBuilder.groupKey(relBuilder.field("a")),
-      relBuilder.aggregateCall(FlinkSqlOperatorTable.INCR_SUM, false, null,
-        "incr_sum_b", relBuilder.field("b"))).build()
-    assertEquals(
-      new RelModifiedMonotonicity(Array(CONSTANT, INCREASING)),
-      mq.getRelModifiedMonotonicity(aggWithIncrSum)
-    )
-
     // test monotonicity lost because group by a agg field
     // select max_c, max(sum_d) as max_sum_d from (
     //   select a, b, max(c) as max_c, sum(d) as sum_d from MyTable4 group by a, b
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index e6624fb..bcea370 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -579,28 +579,6 @@ class RankTest extends TableTestBase {
   }
 
   @Test
-  def testTopNOrderByIncrSum(): Unit = {
-    val subquery =
-      """
-        |SELECT a, b, incr_sum(c) as sum_c
-        |FROM MyTable
-        |GROUP BY a, b
-      """.stripMargin
-
-    val sql =
-      s"""
-         |SELECT *
-         |FROM (
-         |  SELECT a, b, sum_c,
-         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num
-         |  FROM ($subquery))
-         |WHERE row_num <= 10
-      """.stripMargin
-
-    util.verifyPlanWithTrait(sql)
-  }
-
-  @Test
   def testNestedTopN(): Unit = {
     val subquery =
       """
@@ -661,58 +639,5 @@ class RankTest extends TableTestBase {
     util.verifyPlanWithTrait(sql)
   }
 
-  @Test
-  def testTopNWithoutRowNumber2(): Unit = {
-    util.addTableSource[(String, String, String, String, Long, String, Long, String)](
-      "stream_source",
-      'seller_id, 'sku_id, 'venture, 'stat_date, 'trd_amt, 'trd_buyer_id, 'log_pv, 'log_visitor_id)
-
-    val group_sql =
-      """
-        |SELECT
-        |    seller_id
-        |    ,sku_id
-        |    ,venture
-        |    ,stat_date
-        |    ,incr_sum(trd_amt) AS amt_dtr
-        |    ,COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr
-        |    ,SUM(log_pv) AS pv_dtr
-        |    ,COUNT(DISTINCT log_visitor_id) AS uv_dtr
-        |FROM stream_source
-        |GROUP BY seller_id,sku_id,venture,stat_date
-      """.stripMargin
-
-    val sql =
-      s"""
-         |SELECT
-         |    CONCAT(seller_id, venture, stat_date, sku_id) as rowkey,
-         |    seller_id,
-         |    sku_id,
-         |    venture,
-         |    stat_date,
-         |    amt_dtr,
-         |    byr_cnt_dtr,
-         |    pv_dtr,
-         |    uv_dtr
-         |FROM (
-         |  SELECT
-         |        seller_id,
-         |        sku_id,
-         |        venture,
-         |        stat_date,
-         |        amt_dtr,
-         |        byr_cnt_dtr,
-         |        pv_dtr,
-         |        uv_dtr,
-         |        ROW_NUMBER() OVER (PARTITION BY seller_id, venture, stat_date
-         |           ORDER BY amt_dtr DESC) AS rownum
-         |  FROM ($group_sql)
-         |)
-         |WHERE rownum <= 10
-      """.stripMargin
-
-    util.verifyPlanWithTrait(sql)
-  }
-
   // TODO add tests about multi-sinks and udf
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
index b5c51e1..d9ccb89 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
 import org.apache.flink.table.planner.utils.{CountAggFunction, IntSumAggFunction}
 
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 import scala.collection.Seq
 
@@ -86,12 +86,4 @@ class SortDistinctAggregateITCase extends DistinctAggregateITCaseBase {
     )
   }
 
-  @Ignore
-  @Test
-  def testApproximateCountDistinct(): Unit = {
-    checkResult(
-      "SELECT APPROX_COUNT_DISTINCT(b) FROM Table3",
-      Seq(row(6))
-    )
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 75178e5..7d8796d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -398,42 +398,6 @@ class AggregateITCase(
   }
 
   @Test
-  def testIncrSum(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "A"))
-    data.+=((-2, 2L, "B"))
-    data.+=((3, 2L, "B"))
-    data.+=((-4, 3L, "C"))
-    data.+=((5, 3L, "C"))
-    data.+=((6, 3L, "C"))
-    data.+=((-7, 4L, "B"))
-    data.+=((8, 4L, "A"))
-    data.+=((9, 4L, "D"))
-    data.+=((10, 4L, "E"))
-    data.+=((-11, 5L, "A"))
-    data.+=((12, 5L, "B"))
-
-    val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.registerTable("T", t)
-
-    val sql =
-      """
-        |SELECT b, incr_sum(a)
-        |FROM T
-        |GROUP BY b
-      """.stripMargin
-
-    val t1 = tEnv.sqlQuery(sql)
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("1,1", "2,3", "3,11", "4,27", "5,12")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-
-  }
-
-  @Test
   def testNestedGroupByAgg(): Unit = {
     val data = new mutable.MutableList[(Int, Long, String)]
     data.+=((1, 1L, "A"))


[flink] 01/03: [FLINK-13529][table-planner-blink] Rename CONCAT_AGG to LISTAGG and fix the behavior according to the ANSI-SQL

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 85e0538a0041472551164a39126e71f295575c56
Author: beyond1920 <be...@126.com>
AuthorDate: Thu Aug 1 16:55:23 2019 +0800

    [FLINK-13529][table-planner-blink] Rename CONCAT_AGG to LISTAGG and fix the behavior according to the ANSI-SQL
    
    According to the ANSI-SQL, the LISTAGG function is used to transform values from a group of rows into a list of values that are delimited by a configurable separator.
    
    This closes #9316
---
 ...ConcatAggFunction.java => ListAggFunction.java} | 14 +++---
 ...ion.java => ListAggWithRetractAggFunction.java} | 30 ++++++------
 ...n.java => ListAggWsWithRetractAggFunction.java} | 30 ++++++------
 .../functions/sql/FlinkSqlOperatorTable.java       |  4 +-
 ...catAggFunction.java => SqlListAggFunction.java} | 27 +++++++----
 .../plan/rules/logical/SplitAggregateRule.scala    |  4 +-
 .../planner/plan/utils/AggFunctionFactory.scala    | 22 ++++-----
 .../table/planner/plan/utils/AggregateUtil.scala   |  4 +-
 ...java => ListAggWithRetractAggFunctionTest.java} | 16 +++----
 ...va => ListAggWsWithRetractAggFunctionTest.java} | 36 +++++++-------
 .../apache/flink/table/api/stream/ExplainTest.xml  | 48 +++++++++----------
 .../planner/plan/batch/sql/RemoveCollationTest.xml | 16 +++----
 .../plan/rules/logical/SplitAggregateRuleTest.xml  |  8 ++--
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 40 ++++++++--------
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 34 ++++++-------
 .../stream/sql/agg/IncrementalAggregateTest.xml    | 10 ++--
 .../flink/table/api/stream/ExplainTest.scala       |  4 +-
 .../plan/batch/sql/RemoveCollationTest.scala       |  4 +-
 .../rules/logical/SplitAggregateRuleTest.scala     |  2 +-
 .../stream/sql/MiniBatchIntervalInferTest.scala    |  6 +--
 .../stream/sql/agg/DistinctAggregateTest.scala     |  2 +-
 .../table/validation/AggregateValidationTest.scala | 55 ++++++++++++++++++----
 .../runtime/batch/sql/agg/SortAggITCase.scala      | 11 ++---
 .../runtime/stream/sql/AggregateITCase.scala       | 20 ++++----
 .../runtime/stream/table/AggregateITCase.scala     |  2 +-
 25 files changed, 245 insertions(+), 204 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
similarity index 91%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
index 2e8f5fb..377b251 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
@@ -32,23 +32,23 @@ import static org.apache.flink.table.planner.expressions.ExpressionBuilder.liter
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
 
 /**
- * built-in concat aggregate function.
+ * built-in listagg aggregate function.
  */
-public class ConcatAggFunction extends DeclarativeAggregateFunction {
+public class ListAggFunction extends DeclarativeAggregateFunction {
 	private int operandCount;
 	private UnresolvedReferenceExpression acc = unresolvedRef("concatAcc");
 	private UnresolvedReferenceExpression accDelimiter = unresolvedRef("accDelimiter");
 	private Expression delimiter;
 	private Expression operand;
 
-	public ConcatAggFunction(int operandCount) {
+	public ListAggFunction(int operandCount) {
 		this.operandCount = operandCount;
 		if (operandCount == 1) {
-			delimiter = literal("\n", DataTypes.STRING());
+			delimiter = literal(",", DataTypes.STRING());
 			operand = operand(0);
 		} else {
-			delimiter = operand(0);
-			operand = operand(1);
+			delimiter = operand(1);
+			operand = operand(0);
 		}
 	}
 
@@ -75,7 +75,7 @@ public class ConcatAggFunction extends DeclarativeAggregateFunction {
 	@Override
 	public Expression[] initialValuesExpressions() {
 		return new Expression[] {
-				/* delimiter */ literal("\n", DataTypes.STRING()),
+				/* delimiter */ literal(",", DataTypes.STRING()),
 				/* acc */ nullOf(DataTypes.STRING())
 		};
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
similarity index 77%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
index ea5857a..8840844 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
@@ -31,18 +31,18 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * built-in concat with retraction aggregate function.
+ * built-in listagg with retraction aggregate function.
  */
-public final class ConcatWithRetractAggFunction
-	extends AggregateFunction<BinaryString, ConcatWithRetractAggFunction.ConcatWithRetractAccumulator> {
+public final class ListAggWithRetractAggFunction
+	extends AggregateFunction<BinaryString, ListAggWithRetractAggFunction.ListAggWithRetractAccumulator> {
 
 	private static final long serialVersionUID = -2836795091288790955L;
-	private static final BinaryString lineDelimiter = BinaryString.fromString("\n");
+	private static final BinaryString lineDelimiter = BinaryString.fromString(",");
 
 	/**
-	 * The initial accumulator for concat with retraction aggregate function.
+	 * The initial accumulator for listagg with retraction aggregate function.
 	 */
-	public static class ConcatWithRetractAccumulator {
+	public static class ListAggWithRetractAccumulator {
 		public ListView<BinaryString> list = new ListView<>(BinaryStringTypeInfo.INSTANCE);
 		public ListView<BinaryString> retractList = new ListView<>(BinaryStringTypeInfo.INSTANCE);
 
@@ -55,25 +55,25 @@ public final class ConcatWithRetractAggFunction
 			if (o == null || getClass() != o.getClass()) {
 				return false;
 			}
-			ConcatWithRetractAccumulator that = (ConcatWithRetractAccumulator) o;
+			ListAggWithRetractAccumulator that = (ListAggWithRetractAccumulator) o;
 			return Objects.equals(list, that.list) &&
 				Objects.equals(retractList, that.retractList);
 		}
 	}
 
 	@Override
-	public ConcatWithRetractAccumulator createAccumulator() {
-		return new ConcatWithRetractAccumulator();
+	public ListAggWithRetractAccumulator createAccumulator() {
+		return new ListAggWithRetractAccumulator();
 	}
 
-	public void accumulate(ConcatWithRetractAccumulator acc, BinaryString value) throws Exception {
+	public void accumulate(ListAggWithRetractAccumulator acc, BinaryString value) throws Exception {
 		// ignore null value
 		if (value != null) {
 			acc.list.add(value);
 		}
 	}
 
-	public void retract(ConcatWithRetractAccumulator acc, BinaryString value) throws Exception {
+	public void retract(ListAggWithRetractAccumulator acc, BinaryString value) throws Exception {
 		if (value != null) {
 			if (!acc.list.remove(value)) {
 				acc.retractList.add(value);
@@ -81,8 +81,8 @@ public final class ConcatWithRetractAggFunction
 		}
 	}
 
-	public void merge(ConcatWithRetractAccumulator acc, Iterable<ConcatWithRetractAccumulator> its) throws Exception {
-		for (ConcatWithRetractAccumulator otherAcc : its) {
+	public void merge(ListAggWithRetractAccumulator acc, Iterable<ListAggWithRetractAccumulator> its) throws Exception {
+		for (ListAggWithRetractAccumulator otherAcc : its) {
 			// merge list of acc and other
 			List<BinaryString> buffer = new ArrayList<>();
 			for (BinaryString binaryString : acc.list.get()) {
@@ -117,7 +117,7 @@ public final class ConcatWithRetractAggFunction
 	}
 
 	@Override
-	public BinaryString getValue(ConcatWithRetractAccumulator acc) {
+	public BinaryString getValue(ListAggWithRetractAccumulator acc) {
 		try {
 			Iterable<BinaryString> accList = acc.list.get();
 			if (accList == null || !accList.iterator().hasNext()) {
@@ -131,7 +131,7 @@ public final class ConcatWithRetractAggFunction
 		}
 	}
 
-	public void resetAccumulator(ConcatWithRetractAccumulator acc) {
+	public void resetAccumulator(ListAggWithRetractAccumulator acc) {
 		acc.list.clear();
 		acc.retractList.clear();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
similarity index 77%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
index a968cb4..317f97e 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
@@ -31,20 +31,20 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * built-in concatWs with retraction aggregate function.
+ * built-in listAggWs with retraction aggregate function.
  */
-public final class ConcatWsWithRetractAggFunction
-	extends AggregateFunction<BinaryString, ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator> {
+public final class ListAggWsWithRetractAggFunction
+	extends AggregateFunction<BinaryString, ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator> {
 
 	private static final long serialVersionUID = -8627988150350160473L;
 
 	/**
 	 * The initial accumulator for concat with retraction aggregate function.
 	 */
-	public static class ConcatWsWithRetractAccumulator {
+	public static class ListAggWsWithRetractAccumulator {
 		public ListView<BinaryString> list = new ListView<>(BinaryStringTypeInfo.INSTANCE);
 		public ListView<BinaryString> retractList = new ListView<>(BinaryStringTypeInfo.INSTANCE);
-		public BinaryString delimiter = BinaryString.fromString("\n");
+		public BinaryString delimiter = BinaryString.fromString(",");
 
 		@VisibleForTesting
 		@Override
@@ -55,7 +55,7 @@ public final class ConcatWsWithRetractAggFunction
 			if (o == null || getClass() != o.getClass()) {
 				return false;
 			}
-			ConcatWsWithRetractAccumulator that = (ConcatWsWithRetractAccumulator) o;
+			ListAggWsWithRetractAccumulator that = (ListAggWsWithRetractAccumulator) o;
 			return Objects.equals(list, that.list) &&
 				Objects.equals(retractList, that.retractList) &&
 				Objects.equals(delimiter, that.delimiter);
@@ -63,11 +63,11 @@ public final class ConcatWsWithRetractAggFunction
 	}
 
 	@Override
-	public ConcatWsWithRetractAccumulator createAccumulator() {
-		return new ConcatWsWithRetractAccumulator();
+	public ListAggWsWithRetractAccumulator createAccumulator() {
+		return new ListAggWsWithRetractAccumulator();
 	}
 
-	public void accumulate(ConcatWsWithRetractAccumulator acc, BinaryString lineDelimiter, BinaryString value) throws Exception {
+	public void accumulate(ListAggWsWithRetractAccumulator acc, BinaryString value, BinaryString lineDelimiter) throws Exception {
 		// ignore null value
 		if (value != null) {
 			acc.delimiter = lineDelimiter;
@@ -75,7 +75,7 @@ public final class ConcatWsWithRetractAggFunction
 		}
 	}
 
-	public void retract(ConcatWsWithRetractAccumulator acc, BinaryString lineDelimiter, BinaryString value) throws Exception {
+	public void retract(ListAggWsWithRetractAccumulator acc, BinaryString value, BinaryString lineDelimiter) throws Exception {
 		if (value != null) {
 			acc.delimiter = lineDelimiter;
 			if (!acc.list.remove(value)) {
@@ -84,8 +84,8 @@ public final class ConcatWsWithRetractAggFunction
 		}
 	}
 
-	public void merge(ConcatWsWithRetractAccumulator acc, Iterable<ConcatWsWithRetractAccumulator> its) throws Exception {
-		for (ConcatWsWithRetractAccumulator otherAcc : its) {
+	public void merge(ListAggWsWithRetractAccumulator acc, Iterable<ListAggWsWithRetractAccumulator> its) throws Exception {
+		for (ListAggWsWithRetractAccumulator otherAcc : its) {
 			if (!otherAcc.list.get().iterator().hasNext()
 				&& !otherAcc.retractList.get().iterator().hasNext()) {
 				// otherAcc is empty, skip it
@@ -127,7 +127,7 @@ public final class ConcatWsWithRetractAggFunction
 	}
 
 	@Override
-	public BinaryString getValue(ConcatWsWithRetractAccumulator acc) {
+	public BinaryString getValue(ListAggWsWithRetractAccumulator acc) {
 		try {
 			Iterable<BinaryString> accList = acc.list.get();
 			if (accList == null || !accList.iterator().hasNext()) {
@@ -141,8 +141,8 @@ public final class ConcatWsWithRetractAggFunction
 		}
 	}
 
-	public void resetAccumulator(ConcatWsWithRetractAccumulator acc) {
-		acc.delimiter = BinaryString.fromString("\n");
+	public void resetAccumulator(ListAggWsWithRetractAccumulator acc) {
+		acc.delimiter = BinaryString.fromString(",");
 		acc.list.clear();
 		acc.retractList.clear();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 50ff193..dbabb69 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -946,9 +946,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFirstLastValueAggFunction LAST_VALUE = new SqlFirstLastValueAggFunction(SqlKind.LAST_VALUE);
 
 	/**
-	 * <code>CONCAT_AGG</code> aggregate function.
+	 * <code>LISTAGG</code> aggregate function.
 	 */
-	public static final SqlConcatAggFunction CONCAT_AGG = new SqlConcatAggFunction();
+	public static final SqlListAggFunction LISTAGG = new SqlListAggFunction();
 
 	/**
 	 * <code>INCR_SUM</code> aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
similarity index 71%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
index 7e215e0..9557f0c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
@@ -25,30 +25,37 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeTransforms;
 
 import java.util.List;
 
 /**
- * <code>CONCAT_AGG</code> aggregate function returns the concatenation of
+ * <code>LISTAGG</code> aggregate function returns the concatenation of
  * a list of values that are input to the function.
+ *
+ * <p>NOTE: The difference between this and {@link SqlStdOperatorTable#LISTAGG} is that:
+ * (1). constraint the second parameter must to be a character literal.
+ * (2). not require over clause to use this aggregate function.
  */
-public class SqlConcatAggFunction extends SqlAggFunction {
+public class SqlListAggFunction extends SqlAggFunction {
 
-	public SqlConcatAggFunction() {
-		super("CONCAT_AGG",
+	public SqlListAggFunction() {
+		super("LISTAGG",
 				null,
-				SqlKind.OTHER_FUNCTION,
-				ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+				SqlKind.LISTAGG,
+				ReturnTypes.ARG0_NULLABLE,
 				null,
 				OperandTypes.or(
 						OperandTypes.CHARACTER,
-						OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
-				SqlFunctionCategory.STRING,
+						OperandTypes.sequence(
+								"'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)'",
+								OperandTypes.CHARACTER,
+								OperandTypes.and(OperandTypes.CHARACTER, OperandTypes.LITERAL)
+						)),
+				SqlFunctionCategory.SYSTEM,
 				false,
 				false);
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index b2bd3a5..524d5a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -334,8 +334,8 @@ object SplitAggregateRule {
       (Seq(FlinkSqlOperatorTable.FIRST_VALUE), Seq(FlinkSqlOperatorTable.FIRST_VALUE)),
     FlinkSqlOperatorTable.LAST_VALUE ->
       (Seq(FlinkSqlOperatorTable.LAST_VALUE), Seq(FlinkSqlOperatorTable.LAST_VALUE)),
-    FlinkSqlOperatorTable.CONCAT_AGG ->
-      (Seq(FlinkSqlOperatorTable.CONCAT_AGG), Seq(FlinkSqlOperatorTable.CONCAT_AGG)),
+    FlinkSqlOperatorTable.LISTAGG ->
+      (Seq(FlinkSqlOperatorTable.LISTAGG), Seq(FlinkSqlOperatorTable.LISTAGG)),
     SINGLE_VALUE -> (Seq(SINGLE_VALUE), Seq(SINGLE_VALUE))
   )
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 4dd3439..a8e00d3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu
 import org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions._
-import org.apache.flink.table.planner.functions.sql.{SqlConcatAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
+import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
 import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
@@ -113,11 +113,11 @@ class AggFunctionFactory(
       case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.LAST_VALUE =>
         createLastValueAggFunction(argTypes, index)
 
-      case _: SqlConcatAggFunction if call.getArgList.size() == 1 =>
-        createConcatAggFunction(argTypes, index)
+      case _: SqlListAggFunction if call.getArgList.size() == 1 =>
+        createListAggFunction(argTypes, index)
 
-      case _: SqlConcatAggFunction if call.getArgList.size() == 2 =>
-        createConcatWsAggFunction(argTypes, index)
+      case _: SqlListAggFunction if call.getArgList.size() == 2 =>
+        createListAggWsFunction(argTypes, index)
 
       // TODO supports SqlCardinalityCountAggFunction
 
@@ -606,23 +606,23 @@ class AggFunctionFactory(
     }
   }
 
-  private def createConcatAggFunction(
+  private def createListAggFunction(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (needRetraction(index)) {
-      new ConcatWithRetractAggFunction
+      new ListAggWithRetractAggFunction
     } else {
-      new ConcatAggFunction(1)
+      new ListAggFunction(1)
     }
   }
 
-  private def createConcatWsAggFunction(
+  private def createListAggWsFunction(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (needRetraction(index)) {
-      new ConcatWsWithRetractAggFunction
+      new ListAggWsWithRetractAggFunction
     } else {
-      new ConcatAggFunction(2)
+      new ListAggFunction(2)
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 544555a..8877be5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.planner.dataview.DataViewUtils.useNullSerializerFo
 import org.apache.flink.table.planner.dataview.{DataViewSpec, MapViewSpec}
 import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
-import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlConcatAggFunction, SqlFirstLastValueAggFunction}
+import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlListAggFunction, SqlFirstLastValueAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
@@ -541,7 +541,7 @@ object AggregateUtil extends Enumeration {
              _: SqlSumAggFunction |
              _: SqlSumEmptyIsZeroAggFunction |
              _: SqlSingleValueAggFunction |
-             _: SqlConcatAggFunction => true
+             _: SqlListAggFunction => true
         case _: SqlFirstLastValueAggFunction => aggCall.getArgList.size() == 1
         case _ => false
       }
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
similarity index 79%
rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
index 725d802..5c5566e 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
@@ -20,17 +20,17 @@ package org.apache.flink.table.planner.functions.aggfunctions;
 
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.planner.functions.aggfunctions.ConcatWithRetractAggFunction.ConcatWithRetractAccumulator;
+import org.apache.flink.table.planner.functions.aggfunctions.ListAggWithRetractAggFunction.ListAggWithRetractAccumulator;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
 
 /**
- * Test case for built-in concat with retraction aggregate function.
+ * Test case for built-in LISTAGG with retraction aggregate function.
  */
-public class ConcatWithRetractAggFunctionTest
-	extends AggFunctionTestBase<BinaryString, ConcatWithRetractAccumulator> {
+public class ListAggWithRetractAggFunctionTest
+	extends AggFunctionTestBase<BinaryString, ListAggWithRetractAccumulator> {
 
 	@Override
 	protected List<List<BinaryString>> getInputValueSets() {
@@ -53,14 +53,14 @@ public class ConcatWithRetractAggFunctionTest
 	@Override
 	protected List<BinaryString> getExpectedResults() {
 		return Arrays.asList(
-				BinaryString.fromString("a\nb\nc\nd\ne\nf"),
+				BinaryString.fromString("a,b,c,d,e,f"),
 				null,
 				BinaryString.fromString("a"));
 	}
 
 	@Override
-	protected AggregateFunction<BinaryString, ConcatWithRetractAccumulator> getAggregator() {
-		return new ConcatWithRetractAggFunction();
+	protected AggregateFunction<BinaryString, ListAggWithRetractAccumulator> getAggregator() {
+		return new ListAggWithRetractAggFunction();
 	}
 
 	@Override
@@ -75,6 +75,6 @@ public class ConcatWithRetractAggFunctionTest
 
 	@Override
 	protected Class<?> getAccClass() {
-		return ConcatWithRetractAccumulator.class;
+		return ListAggWithRetractAccumulator.class;
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
similarity index 78%
rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
index d9e06d0..67ee8ff 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.functions.aggfunctions;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.planner.functions.aggfunctions.ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator;
+import org.apache.flink.table.planner.functions.aggfunctions.ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator;
 import org.apache.flink.util.Preconditions;
 
 import java.lang.reflect.InvocationTargetException;
@@ -34,8 +34,8 @@ import static org.junit.Assert.assertEquals;
 /**
  * Test case for built-in concatWs with retraction aggregate function.
  */
-public class ConcatWsWithRetractAggFunctionTest
-	extends AggFunctionTestBase<BinaryString, ConcatWsWithRetractAccumulator> {
+public class ListAggWsWithRetractAggFunctionTest
+	extends AggFunctionTestBase<BinaryString, ListAggWsWithRetractAccumulator> {
 
 	@Override
 	protected List<List<BinaryString>> getInputValueSets() {
@@ -84,8 +84,8 @@ public class ConcatWsWithRetractAggFunctionTest
 	}
 
 	@Override
-	protected AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> getAggregator() {
-		return new ConcatWsWithRetractAggFunction();
+	protected AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> getAggregator() {
+		return new ListAggWsWithRetractAggFunction();
 	}
 
 	@Override
@@ -101,14 +101,14 @@ public class ConcatWsWithRetractAggFunctionTest
 
 	@Override
 	protected Class<?> getAccClass() {
-		return ConcatWsWithRetractAccumulator.class;
+		return ListAggWsWithRetractAccumulator.class;
 	}
 
 	@Override
 	protected <E> void validateResult(E expected, E result) {
-		if (expected instanceof ConcatWsWithRetractAccumulator && result instanceof ConcatWsWithRetractAccumulator) {
-			ConcatWsWithRetractAccumulator e = (ConcatWsWithRetractAccumulator) expected;
-			ConcatWsWithRetractAccumulator r = (ConcatWsWithRetractAccumulator) result;
+		if (expected instanceof ListAggWsWithRetractAccumulator && result instanceof ListAggWsWithRetractAccumulator) {
+			ListAggWsWithRetractAccumulator e = (ListAggWsWithRetractAccumulator) expected;
+			ListAggWsWithRetractAccumulator r = (ListAggWsWithRetractAccumulator) result;
 			assertEquals(e.list, r.list);
 			assertEquals(e.list, r.list);
 		} else {
@@ -117,31 +117,31 @@ public class ConcatWsWithRetractAggFunctionTest
 	}
 
 	@Override
-	protected ConcatWsWithRetractAccumulator accumulateValues(List<BinaryString> values)
+	protected ListAggWsWithRetractAccumulator accumulateValues(List<BinaryString> values)
 			throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-		AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> aggregator = getAggregator();
-		ConcatWsWithRetractAccumulator accumulator = getAggregator().createAccumulator();
+		AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> aggregator = getAggregator();
+		ListAggWsWithRetractAccumulator accumulator = getAggregator().createAccumulator();
 		Method accumulateFunc = getAccumulateFunc();
 		Preconditions.checkArgument(values.size() % 2 == 0,
 				"number of values must be an integer multiple of 2.");
 		for (int i = 0; i < values.size(); i += 2) {
-			BinaryString value = values.get(i);
-			BinaryString delimiter = values.get(i + 1);
+			BinaryString value = values.get(i + 1);
+			BinaryString delimiter = values.get(i);
 			accumulateFunc.invoke(aggregator, accumulator, delimiter, value);
 		}
 		return accumulator;
 	}
 
 	@Override
-	protected void retractValues(ConcatWsWithRetractAccumulator accumulator, List<BinaryString> values)
+	protected void retractValues(ListAggWsWithRetractAccumulator accumulator, List<BinaryString> values)
 			throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-		AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> aggregator = getAggregator();
+		AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> aggregator = getAggregator();
 		Method retractFunc = getRetractFunc();
 		Preconditions.checkArgument(values.size() % 2 == 0,
 				"number of values must be an integer multiple of 2.");
 		for (int i = 0; i < values.size(); i += 2) {
-			BinaryString value = values.get(i);
-			BinaryString delimiter = values.get(i + 1);
+			BinaryString value = values.get(i + 1);
+			BinaryString delimiter = values.get(i);
 			retractFunc.invoke(aggregator, accumulator, delimiter, value);
 		}
 	}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index fd84d11..16d3278 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -637,8 +637,8 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'#'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -649,8 +649,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -670,15 +670,15 @@ Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc]
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
 
 Sink(name=[appendSink1], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
    +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-      +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+      +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
    +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-      +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+      +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
          +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
@@ -713,11 +713,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+								content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 									ship_strategy : HASH
 
 									 : Operator
@@ -725,11 +725,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+											content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
 											ship_strategy : FORWARD
 
 											 : Operator
-												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 												ship_strategy : HASH
 
 												 : Operator
@@ -784,8 +784,8 @@ Union(all=[true], union=[a, b, c])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'#'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -796,8 +796,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -817,15 +817,15 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime])
 
 Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text])
+      +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text])
+      +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3])
          +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
@@ -860,11 +860,11 @@ Sink(name=[appendSink2], fields=[a, b])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+								content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 									ship_strategy : HASH
 
 									 : Operator
@@ -872,11 +872,11 @@ Sink(name=[appendSink2], fields=[a, b])
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+											content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
 											ship_strategy : FORWARD
 
 											 : Operator
-												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 												ship_strategy : HASH
 
 												 : Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
index fd8003b..76b6419 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
@@ -310,7 +310,7 @@ v2 as (
 ),
 
 join_tb2 as (
- select tb1_id, concat_agg(tb2_name, ',') as tb2_names
+ select tb1_id, LISTAGG(tb2_name, ',') as tb2_names
  from (
   select v1.id as tb1_id, tb2.name as tb2_name
    from v1 left outer join tb2 on tb2_id = tb2.id
@@ -318,7 +318,7 @@ join_tb2 as (
 ),
 
 join_tb3 as (
- select tb1_id, concat_agg(tb3_name, ',') as tb3_names
+ select tb1_id, LISTAGG(tb3_name, ',') as tb3_names
  from (
   select v2.id as tb1_id, tb3.name as tb3_name
    from v2 left outer join tb3 on tb3_id = tb3.id
@@ -349,7 +349,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], name=[$4], tb2_names=[$6], t
    :  :- LogicalJoin(condition=[=($0, $7)], joinType=[left])
    :  :  :- LogicalJoin(condition=[=($0, $5)], joinType=[left])
    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]])
-   :  :  :  +- LogicalAggregate(group=[{0}], tb2_names=[CONCAT_AGG($1, $2)])
+   :  :  :  +- LogicalAggregate(group=[{0}], tb2_names=[LISTAGG($1, $2)])
    :  :  :     +- LogicalProject(tb1_id=[$0], tb2_name=[$3], $f2=[_UTF-16LE','])
    :  :  :        +- LogicalJoin(condition=[=($1, $2)], joinType=[left])
    :  :  :           :- LogicalProject(id=[$0], tb2_id=[$5])
@@ -357,7 +357,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], name=[$4], tb2_names=[$6], t
    :  :  :           :     :- LogicalTableScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]])
    :  :  :           :     +- LogicalTableFunctionScan(invocation=[split($cor0.tb2_ids)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;])
    :  :  :           +- LogicalTableScan(table=[[default_catalog, default_database, tb2, source: [TestTableSource(id, name)]]])
-   :  :  +- LogicalAggregate(group=[{0}], tb3_names=[CONCAT_AGG($1, $2)])
+   :  :  +- LogicalAggregate(group=[{0}], tb3_names=[LISTAGG($1, $2)])
    :  :     +- LogicalProject(tb1_id=[$0], tb3_name=[$3], $f2=[_UTF-16LE','])
    :  :        +- LogicalJoin(condition=[=($1, $2)], joinType=[left])
    :  :           :- LogicalProject(id=[$0], tb3_id=[$5])
@@ -382,10 +382,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, tb3_names, name0, name1])
    :     :        :  +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(id, tb1_id)], select=[id, key, tb2_ids, tb3_ids, name, tb1_id, tb2_names], rightSorted=[true])
    :     :        :     :- Exchange(distribution=[hash[id]])
    :     :        :     :  +- TableSourceScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]], fields=[id, key, tb2_ids, tb3_ids, name], reuse_id=[1])
-   :     :        :     +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb2_names])
+   :     :        :     +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb2_names])
    :     :        :        +- Sort(orderBy=[tb1_id ASC])
    :     :        :           +- Exchange(distribution=[hash[tb1_id]])
-   :     :        :              +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_CONCAT_AGG(tb2_name, $f2) AS (accDelimiter$0, concatAcc$1)])
+   :     :        :              +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_LISTAGG(tb2_name, $f2) AS (accDelimiter$0, concatAcc$1)])
    :     :        :                 +- Sort(orderBy=[tb1_id ASC])
    :     :        :                    +- Calc(select=[id AS tb1_id, name AS tb2_name, _UTF-16LE',' AS $f2])
    :     :        :                       +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb2_id, id0)], select=[id, tb2_id, id0, name])
@@ -395,10 +395,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, tb3_names, name0, name1])
    :     :        :                          :        +- Reused(reference_id=[1])
    :     :        :                          +- Exchange(distribution=[hash[id]])
    :     :        :                             +- TableSourceScan(table=[[default_catalog, default_database, tb2, source: [TestTableSource(id, name)]]], fields=[id, name])
-   :     :        +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb3_names])
+   :     :        +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb3_names])
    :     :           +- Sort(orderBy=[tb1_id ASC])
    :     :              +- Exchange(distribution=[hash[tb1_id]])
-   :     :                 +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_CONCAT_AGG(tb3_name, $f2) AS (accDelimiter$0, concatAcc$1)])
+   :     :                 +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_LISTAGG(tb3_name, $f2) AS (accDelimiter$0, concatAcc$1)])
    :     :                    +- Sort(orderBy=[tb1_id ASC])
    :     :                       +- Calc(select=[id AS tb1_id, name AS tb3_name, _UTF-16LE',' AS $f2])
    :     :                          +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb3_id, id0)], select=[id, tb3_id, id0, name])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
index a57c307..b59a9c8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -171,19 +171,19 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[MIN($3)], agg#1=[MAX($4)], agg#2=[SUM(
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalAggregate(group=[{0}], agg#0=[CONCAT_AGG($2)], agg#1=[$SUM0($3)])
-+- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[CONCAT_AGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5])
+FlinkLogicalAggregate(group=[{0}], agg#0=[LISTAGG($2)], agg#1=[$SUM0($3)])
++- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5])
    +- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
       +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
          +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 19d3208..9a1d03b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -220,11 +220,11 @@ Calc(select=[a, b])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'*'], text=[$1])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-            +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
-               +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+            +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+               +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                   +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
                      +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                         +- LogicalJoin(condition=[true], joinType=[inner])
@@ -235,8 +235,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'-'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-'])
          +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
             +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -249,8 +249,8 @@ LogicalSink(name=[appendSink3], fields=[a, b])
 +- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
    +- LogicalProject(id1=[$0], text=[$1])
       +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-         +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
-            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+         +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
                   +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                      +- LogicalJoin(condition=[true], joinType=[inner])
@@ -269,21 +269,21 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
       +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
 
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
 +- Exchange(distribution=[hash[id1]])
-   +- Calc(select=[ts, id1, _UTF-16LE'#' AS $f2, text])
+   +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
       +- Reused(reference_id=[1])
 
 Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text])
+      +- Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3])
          +- Reused(reference_id=[2])
 
 Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[ts, id1, _UTF-16LE'-' AS $f2, text])
+      +- Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink3], fields=[a, b])
@@ -325,19 +325,19 @@ Sink(name=[appendSink3], fields=[a, b])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (ts, id1, _UTF-16LE'#' AS $f2, text))
+								content : Calc(select: (ts, id1, text, _UTF-16LE'#' AS $f3))
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
 									ship_strategy : HASH
 
 									 : Operator
-										content : Calc(select: (w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text))
+										content : Calc(select: (w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3))
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+											content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 											ship_strategy : HASH
 
 											 : Operator
@@ -345,11 +345,11 @@ Sink(name=[appendSink3], fields=[a, b])
 												ship_strategy : FORWARD
 
 												 : Operator
-													content : Calc(select: (ts, id1, _UTF-16LE'-' AS $f2, text))
+													content : Calc(select: (ts, id1, text, _UTF-16LE'-' AS $f3))
 													ship_strategy : FORWARD
 
 													 : Operator
-														content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+														content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 														ship_strategy : HASH
 
 														 : Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index e9ec761..dc74eb4 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -677,18 +677,18 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+GroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
    +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -697,20 +697,20 @@ GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- LocalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
+   +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
       +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -718,20 +718,20 @@ GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatA
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
+GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
+   +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
       +- Exchange(distribution=[hash[a, $f3]])
          +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
@@ -743,23 +743,23 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETR
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT(concat_agg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT(listagg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT($f2) AS concat_agg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2])
-      +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0])
+   +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS listagg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2])
+      +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0])
          +- Exchange(distribution=[hash[a, $f3]])
-            +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+            +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
                +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                   +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                      +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index a415121..ccef832 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -196,22 +196,22 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
+   +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
       +- Exchange(distribution=[hash[a, $f3]])
-         +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+         +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
             +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                   +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 9952e34..198fb0f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -131,7 +131,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
     val table1 = util.tableEnv.sqlQuery(
       """
-        |SELECT id1, CONCAT_AGG('#', text)
+        |SELECT id1, LISTAGG(text, '#')
         |FROM TempTable
         |GROUP BY id1, TUMBLE(ts, INTERVAL '8' SECOND)
       """.stripMargin)
@@ -140,7 +140,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
     val table2 = util.tableEnv.sqlQuery(
       """
-        |SELECT id1, CONCAT_AGG('*', text)
+        |SELECT id1, LISTAGG(text, '*')
         |FROM TempTable
         |GROUP BY id1, HOP(ts, INTERVAL '12' SECOND, INTERVAL '6' SECOND)
       """.stripMargin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
index 73c202d..5a5bdf5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
@@ -307,7 +307,7 @@ class RemoveCollationTest extends TableTestBase {
         |),
         |
         |join_tb2 as (
-        | select tb1_id, concat_agg(tb2_name, ',') as tb2_names
+        | select tb1_id, LISTAGG(tb2_name, ',') as tb2_names
         | from (
         |  select v1.id as tb1_id, tb2.name as tb2_name
         |   from v1 left outer join tb2 on tb2_id = tb2.id
@@ -315,7 +315,7 @@ class RemoveCollationTest extends TableTestBase {
         |),
         |
         |join_tb3 as (
-        | select tb1_id, concat_agg(tb3_name, ',') as tb3_names
+        | select tb1_id, LISTAGG(tb3_name, ',') as tb3_names
         | from (
         |  select v2.id as tb1_id, tb3.name as tb3_name
         |   from v2 left outer join tb3 on tb3_id = tb3.id
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index b91649b..5d58dfb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -95,7 +95,7 @@ class SplitAggregateRuleTest extends TableTestBase {
 
   @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
-    util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
+    util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index ce9e042..9de3886 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -297,7 +297,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     val table2 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('#', text) as text,
+        |    LISTAGG(text, '#') as text,
         |    TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts
         |FROM TempTable1
         |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
@@ -307,7 +307,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   val table3 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('*', text)
+        |    LISTAGG(text, '*')
         |FROM TempTable2
         |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1
       """.stripMargin)
@@ -317,7 +317,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     val table4 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('-', text)
+        |    LISTAGG(text, '-')
         |FROM TempTable1
         |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1
       """.stripMargin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 7a30103..8e36305 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -103,7 +103,7 @@ class DistinctAggregateTest(
 
   @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
-    util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
+    util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
index 5ce79fc..99ed303 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
@@ -22,14 +22,16 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.utils.{TableFunc0, TableTestBase}
+import org.apache.flink.types.Row
 
+import org.junit.Assert.{assertTrue, fail}
 import org.junit.Test
 
 class AggregateValidationTest extends TableTestBase {
+  private val util = scalaStreamTestUtil()
 
   @Test(expected = classOf[ValidationException])
   def testGroupingOnNonExistentField(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     val ds = table
@@ -40,7 +42,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testGroupingInvalidSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -51,7 +52,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidAggregationInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -63,7 +63,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWindowPropertiesInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -75,7 +74,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[RuntimeException])
   def testTableFunctionInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     util.addFunction("func", new TableFunc0)
@@ -90,7 +88,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidScalarFunctionInAggregate(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -102,7 +99,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidTableFunctionInAggregate(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     util.addFunction("func", new TableFunc0)
@@ -115,13 +111,52 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[RuntimeException])
   def testMultipleAggregateExpressionInAggregate(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
-
     util.addFunction("func", new TableFunc0)
+    val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
     table
       .groupBy('a)
       // must fail. Only one AggregateFunction can be used in aggregate
       .aggregate("sum(c), count(b)")
   }
+
+  @Test
+  def testIllegalArgumentForListAgg(): Unit = {
+    util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd)
+    // If there are two parameters, second one must be character literal.
+    expectExceptionThrown(
+      "SELECT listagg(c, d) FROM T GROUP BY a",
+    "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)",
+      classOf[ValidationException])
+  }
+
+  @Test
+  def testIllegalArgumentForListAgg1(): Unit = {
+    util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd)
+    // If there are two parameters, second one must be character literal.
+    expectExceptionThrown(
+      "SELECT LISTAGG(c, 1) FROM T GROUP BY a",
+      "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)",
+      classOf[ValidationException])
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  private def expectExceptionThrown(
+      sql: String,
+      keywords: String,
+      clazz: Class[_ <: Throwable] = classOf[ValidationException])
+  : Unit = {
+    try {
+      util.tableEnv.toAppendStream[Row](util.tableEnv.sqlQuery(sql))
+      fail(s"Expected a $clazz, but no exception is thrown.")
+    } catch {
+      case e if e.getClass == clazz =>
+        if (keywords != null) {
+          assertTrue(
+            s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
+            e.getMessage.contains(keywords))
+        }
+      case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
index f64b0b6..eed0d3e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
@@ -138,27 +138,26 @@ class SortAggITCase
   }
 
   // NOTE: Spark has agg functions collect_list(), collect_set().
-  //       instead, we'll test concat_agg() here
-  @Ignore
+  //       instead, we'll test LISTAGG() here
   @Test
   def testConcatAgg(): Unit = {
     checkResult(
-      "SELECT concat_agg('-', c), concat_agg(c) FROM SmallTable3",
+      "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM SmallTable3",
       Seq(
-        row("Hi-Hello-Hello world", "Hi\nHello\nHello world")
+        row("Hi-Hello-Hello world", "Hi,Hello,Hello world")
       )
     )
 
     // EmptyTable5
     checkResult(
-      "SELECT concat_agg('-', g), concat_agg(g) FROM EmptyTable5",
+      "SELECT LISTAGG(g, '-'), LISTAGG(g) FROM EmptyTable5",
       Seq(
         row(null, null)
       )
     )
 
     checkResult(
-      "SELECT concat_agg('-', c), concat_agg(c) FROM AllNullTable3",
+      "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM AllNullTable3",
       Seq(
         row(null, null)
       )
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 318487fe..41561a3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.planner.functions.aggfunctions.{ConcatWithRetractAggFunction, ConcatWsWithRetractAggFunction}
+import org.apache.flink.table.planner.functions.aggfunctions.{ListAggWithRetractAggFunction, ListAggWsWithRetractAggFunction}
 import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSumAggFunction
 import org.apache.flink.table.planner.runtime.batch.sql.agg.{MyPojoAggFunction, VarArgsAggFunction}
 import org.apache.flink.table.planner.runtime.utils.StreamingWithAggTestBase.AggMode
@@ -605,7 +605,7 @@ class AggregateITCase(
 
     val sqlQuery =
       s"""
-         |SELECT len, concat_agg('#', content) FROM T GROUP BY len
+         |SELECT len, listagg(content, '#') FROM T GROUP BY len
        """.stripMargin
 
     val sink = new TestingRetractSink
@@ -629,7 +629,7 @@ class AggregateITCase(
 
     val sqlQuery =
       s"""
-         |SELECT len, concat_agg(content) FROM T GROUP BY len
+         |SELECT len, listagg(content) FROM T GROUP BY len
        """.stripMargin
 
     val sink = new TestingRetractSink
@@ -906,7 +906,7 @@ class AggregateITCase(
       """
         |SELECT b, min(c), max(c)
         |FROM (
-        | SELECT a, b, concat_agg(c) as c
+        | SELECT a, b, listagg(c) as c
         | FROM T
         | GROUP BY a, b)
         |GROUP BY b
@@ -1061,15 +1061,15 @@ class AggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  /** Test CONCAT_AGG **/
+  /** Test LISTAGG **/
   @Test
   def testConcatAgg(): Unit = {
-    tEnv.registerFunction("concat_agg_retract", new ConcatWithRetractAggFunction)
-    tEnv.registerFunction("concat_agg_ws_retract", new ConcatWsWithRetractAggFunction)
+    tEnv.registerFunction("listagg_retract", new ListAggWithRetractAggFunction)
+    tEnv.registerFunction("listagg_ws_retract", new ListAggWsWithRetractAggFunction)
     val sqlQuery =
       s"""
          |SELECT
-         |  concat_agg(c), concat_agg('-', c), concat_agg_retract(c), concat_agg_ws_retract('+', c)
+         |  listagg(c), listagg(c, '-'), listagg_retract(c), listagg_ws_retract(c, '+')
          |FROM MyTable
          |GROUP BY c
          |""".stripMargin
@@ -1085,8 +1085,8 @@ class AggregateITCase(
     val sink = new TestingRetractSink
     tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink)
     env.execute()
-    val expected = List("Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," +
-      "Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi")
+    val expected = List("Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," +
+      "Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index 659f747..f2c29c6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -125,7 +125,7 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
 //  @Test
 //  def testSimpleLogical(): Unit = {
 //    val t = failingDataSource(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
-//      .select('c.firstValue, 'c.lastValue, 'c.concat_agg("#"))
+//      .select('c.firstValue, 'c.lastValue, 'c.LISTAGG("#"))
 //
 //    val sink = new TestingRetractSink()
 //    t.toRetractStream[Row].addSink(sink)


[flink] 02/03: [FLINK-13529][table-planner-blink] Remove the second parameter of FIRST_VALUE and LAST_VALUE

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48d5ba85e6f8e2428dabe6f6970d7f37c5ed7bdf
Author: beyond1920 <be...@126.com>
AuthorDate: Thu Aug 1 17:38:19 2019 +0800

    [FLINK-13529][table-planner-blink] Remove the second parameter of FIRST_VALUE and LAST_VALUE
    
    According to ANSI-SQL, FIRST_VALUE and LAST_VALUE are ordered set function which require the within group clause to specify an order instead of pass the order field as a parameter.
    
    This closes #9316
---
 .../sql/SqlFirstLastValueAggFunction.java          | 34 +++++++----
 .../rules/logical/SplitAggregateRuleTest.scala     | 14 -----
 .../stream/sql/agg/DistinctAggregateTest.scala     | 15 -----
 .../runtime/stream/sql/AggregateITCase.scala       | 71 ----------------------
 .../runtime/stream/sql/OverWindowITCase.scala      | 37 +++++------
 .../runtime/stream/sql/SplitAggregateITCase.scala  | 22 -------
 6 files changed, 43 insertions(+), 150 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
index e4b8a11..305f3e1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.functions.sql;
 
+import org.apache.flink.util.Preconditions;
+
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
 
 import org.apache.calcite.rel.type.RelDataType;
@@ -27,8 +29,8 @@ import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Optionality;
 
 import java.util.List;
 
@@ -36,30 +38,40 @@ import java.util.List;
  * <code>FIRST_VALUE</code> and <code>LAST_VALUE</code> aggregate functions
  * return the first or the last value in a list of values that are input to the
  * function.
+ *
+ * <p>NOTE: The difference between this and {@link org.apache.calcite.sql.fun.SqlFirstLastValueAggFunction}
+ * is that this can be used without over clause.
  */
 public class SqlFirstLastValueAggFunction extends SqlAggFunction {
 
-	public SqlFirstLastValueAggFunction(SqlKind sqlKind) {
-		super(sqlKind.name(),
+	public SqlFirstLastValueAggFunction(SqlKind kind) {
+		super(
+				kind.name(),
 				null,
-				sqlKind,
+				kind,
 				ReturnTypes.ARG0_NULLABLE_IF_EMPTY,
 				null,
-				OperandTypes.or(OperandTypes.ANY, OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY)),
+				OperandTypes.ANY,
 				SqlFunctionCategory.NUMERIC,
 				false,
-				false);
+				false,
+				Optionality.FORBIDDEN);
+		Preconditions.checkArgument(kind == SqlKind.FIRST_VALUE
+				|| kind == SqlKind.LAST_VALUE);
 	}
 
-	@Override
+	//~ Methods ----------------------------------------------------------------
+
+	@SuppressWarnings("deprecation")
 	public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
 		return ImmutableList.of(
-				typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true),
-				typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true));
+				typeFactory.createTypeWithNullability(
+						typeFactory.createSqlType(SqlTypeName.ANY), true));
 	}
 
-	@Override
+	@SuppressWarnings("deprecation")
 	public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-		return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true);
+		return typeFactory.createTypeWithNullability(
+				typeFactory.createSqlType(SqlTypeName.ANY), true);
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index 5d58dfb..2680482 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -75,25 +75,11 @@ class SplitAggregateRuleTest extends TableTestBase {
   }
 
   @Test
-  def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = {
-    // FIRST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleLastValueWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
 
   @Test
-  def testSingleLastValueWithOrderWithDistinctAgg(): Unit = {
-    // LAST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 8e36305..5cbbab4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -81,27 +81,12 @@ class DistinctAggregateTest(
   }
 
   @Test
-  def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = {
-    // FIRST_VALUE is not mergeable, so the final plan does not contain local agg
-    // FIRST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleLastValueWithDistinctAgg(): Unit = {
     // LAST_VALUE is not mergeable, so the final plan does not contain local agg
     util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
 
   @Test
-  def testSingleLastValueWithOrderWithDistinctAgg(): Unit = {
-    // LAST_VALUE is not mergeable, so the final plan does not contain local agg
-    // LAST_VALUE with order is not splittable, so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 41561a3..75178e5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -472,76 +471,6 @@ class AggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  @Test
-  def testFirstLastWithOrder(): Unit = {
-    // set all operator parallelism to 1 to make sure the processed input element is in order
-    env.setParallelism(1)
-    val data = new mutable.MutableList[(Long, String, String, Int, Long, String)]
-    data.+=((2L, "u1", "i1", 0, 0L, "b1"))
-    data.+=((-1L, "u1", "i1", 1, 1L, "b1"))
-    data.+=((3L, "u2", "i1", 1, 1L, "b1"))
-    data.+=((4L, "u2", null, 0, 0L, "b1"))
-
-    val t = failingDataSource(data).toTable(tEnv, 'o, 'u, 'i, 'v, 's, 'b)
-    tEnv.registerTable("T", t)
-    val t1 = tEnv.sqlQuery(
-      """
-        |SELECT first_value(u, lo) as f, last_value(u, lo) as l
-        |FROM (
-        |  SELECT b, u, i, last_value(o) as lo, last_value(v, o) as lv,
-        |    first_value(o) as fo, first_value(v, o) as fv
-        |  FROM T
-        |  GROUP BY u, i, b)
-        |GROUP BY i
-      """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List(
-      "u1,u2",
-      "u2,u2")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
-  @Test
-  def testFirstValueWithInputContainingNull(): Unit = {
-    val data = List(
-      Row.of("blond", null, Long.box(23L)),
-      Row.of("slim", null, Long.box(21L)),
-      Row.of("slim", null, Long.box(17L)),
-      Row.of("blond", null, Long.box(19L))
-    )
-
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO) // tpe is automatically
-
-    val t = failingDataSource(data).toTable(tEnv, 't, 'name, 'age)
-    tEnv.registerTable("T", t)
-
-    /* use sql grammar to generate null input for firstValue,
-     * since fromCollection will throw exception when serializing null as Long
-     */
-    val t1 = tEnv.sqlQuery(
-      """
-        |SELECT t,
-        |first_value(name, age) as c,
-        |last_value(name, age) as d
-        |FROM T
-        |GROUP BY t
-      """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("slim,null,null", "blond,null,null")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
   /** test unbounded groupBy (without window) **/
   @Test
   def testUnboundedGroupBy(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
index fddb45f..0c11794 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
@@ -339,10 +339,6 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
       "  c, b, " +
       "  LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  first_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  last_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
       "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
       "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
@@ -354,19 +350,26 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = List(
-      "Hello,1,0,1,1,1,1", "Hello,15,0,1,1,2,2", "Hello,16,0,1,1,3,3",
-      "Hello,2,0,1,2,6,9", "Hello,3,0,1,2,6,9", "Hello,2,0,1,2,6,9",
-      "Hello,3,0,2,3,4,9",
-      "Hello,4,0,3,4,2,7",
-      "Hello,5,1,4,5,2,9",
-      "Hello,6,2,5,6,2,11", "Hello,65,2,6,6,2,12",
-      "Hello,9,2,6,6,2,12", "Hello,9,2,6,6,2,12", "Hello,18,3,6,6,3,18",
-      "Hello World,17,3,7,7,3,21",
-      "Hello World,7,1,7,7,1,7",
-      "Hello World,77,3,7,7,3,21",
-      "Hello World,18,1,7,7,1,7",
-      "Hello World,8,2,7,8,2,15",
-      "Hello World,20,1,20,20,1,20")
+      "Hello,1,0,1,1",
+      "Hello,15,0,2,2",
+      "Hello,16,0,3,3",
+      "Hello,2,0,6,9",
+      "Hello,3,0,6,9",
+      "Hello,2,0,6,9",
+      "Hello,3,0,4,9",
+      "Hello,4,0,2,7",
+      "Hello,5,1,2,9",
+      "Hello,6,2,2,11",
+      "Hello,65,2,2,12",
+      "Hello,9,2,2,12",
+      "Hello,9,2,2,12",
+      "Hello,18,3,3,18",
+      "Hello World,17,3,3,21",
+      "Hello World,7,1,1,7",
+      "Hello World,77,3,3,21",
+      "Hello World,18,1,1,7",
+      "Hello World,8,2,2,15",
+      "Hello World,20,1,1,20")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 66bbfcc..6f7df33 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -195,28 +195,6 @@ class SplitAggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  @Test
-  def testFirstValueLastValueWithRetraction(): Unit = {
-    val t1 = tEnv.sqlQuery(
-      s"""
-         |SELECT
-         |  b, FIRST_VALUE(c, a), LAST_VALUE(c, a), COUNT(DISTINCT c)
-         |FROM(
-         |  SELECT
-         |    a, COUNT(DISTINCT b) as b, MAX(b) as c
-         |  FROM T
-         |  GROUP BY a
-         |) GROUP BY b
-       """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("2,2,6,2", "4,5,5,1", "1,5,5,1")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
   @Ignore("[FLINK-12088]: JOIN is not supported")
   @Test
   def testAggWithJoin(): Unit = {