You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "Hisoka-X (via GitHub)" <gi...@apache.org> on 2023/08/08 15:12:42 UTC

[GitHub] [spark] Hisoka-X opened a new pull request, #42398: [SPARK-42746][SQL] Add the LIST_AGG() aggregate function

Hisoka-X opened a new pull request, #42398:
URL: https://github.com/apache/spark/pull/42398

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   `listagg()` is a common and useful aggregation function to concatenate string values in a column, optionally by a certain order. The systems below have supported such function already:
   
   Oracle: https://docs.oracle.com/cd/E11882_01/server.112/e41084/functions089.htm#SQLRF30030
   Snowflake: https://docs.snowflake.com/en/sql-reference/functions/listagg
   Amazon Redshift: https://docs.aws.amazon.com/redshift/latest/dg/r_LISTAGG.html
   Google BigQuery: https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators#string_agg
   
   This PR add `LISTAGG`  aggregate function support.
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   Add LIST_AGG() aggregate function
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   no
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   add new test.
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "khalidmammadov (via GitHub)" <gi...@apache.org>.
khalidmammadov commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349758338


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)

Review Comment:
   What is the value for these 2 auxilary constractors when default class parameters are also supplied for the same default values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369934056


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      val sorted = if (sameExpression) {
+        if (reverse) {
+          buffer.toSeq.sorted(ordering.reverse)
+        } else {
+          buffer.toSeq.sorted(ordering)
+        }
+      } else {
+        if (reverse) {
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+            child.dataType))
+        } else {
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+            child.dataType))
+        }
+      }
+      UTF8String.fromString(sorted.map(_.toString)
+        .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+    } else {
+      UTF8String.fromString("")
+    }
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val v = if (sameExpression) {
+        convertToBufferElement(value)
+      } else {
+        InternalRow.apply(convertToBufferElement(value),
+          convertToBufferElement(orderExpression.eval(input)))
+      }
+      buffer += v
+    }
+    buffer
+  }
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int) : ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression

Review Comment:
   addressed all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349822359


##########
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##########
@@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     }
   }
 
+  test("SPARK-42746: listagg function") {
+    withTempView("df", "df2") {
+      Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b")
+        .createOrReplaceTempView("df")
+      checkAnswer(
+        sql("select listagg(b) from df group by a"),
+        Row("b,c") :: Row("c,d") :: Row(null) :: Nil)
+
+      checkAnswer(
+        sql("select listagg(b, '|') from df group by a"),
+        Row("b|c") :: Row("c|d") :: Row(null) :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+        Row("a,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+        Row("b,b,a,a") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+          "OVER (PARTITION BY b) FROM df"),
+        Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil)
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df")

Review Comment:
   Yes, so I add `FUNCTION_AND_ORDER_EXPRESSION_MISMATCH` error for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1788269081

   kindly ping @cloud-fan @MaxGekk @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1287925964


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +

Review Comment:
   already add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369484714


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   ```
   ListAgg (...) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]] with BinaryLike[Expression] {
     private val collectList = CollectList()
     ...
     override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = {
       collectList.update(...)
     }
   
     override def merge(buffer: ArrayBuffer[Any],, other: Any): Any = {
       collectList.merge(...)
     }
   
     override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
       val ordering = PhysicalDataType.ordering(orderExpr.dataType)
       val sorted = // sort logic here
      
       new GenericArrayData(sorted.toArray)
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369980594


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -920,6 +920,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
         "fieldNames" -> v1Table.schema.fieldNames.mkString(", ")))
   }
 
+  def functionAndOrderExpressionMismatchError(
+      functionName: String,
+      functionExpr: Expression,
+      orderExpr: Expression): Throwable = {
+    new AnalysisException(
+      errorClass = "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH",
+      messageParameters = Map(
+        "functionName" -> toSQLStmt(functionName),

Review Comment:
   addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368615704


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   Because the order by column can be different with agg column. So we should save it both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1778406651

   cc @cloud-fan @MaxGekk I'm okay with most of this PR. Please help me to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1287854994


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.tokens:
##########
@@ -9,339 +9,374 @@ ADD=8
 AFTER=9
 ALL=10
 ALTER=11
-ANALYZE=12
-AND=13
-ANTI=14
-ANY=15
-ANY_VALUE=16
-ARCHIVE=17
-ARRAY=18
-AS=19
-ASC=20
-AT=21
-AUTHORIZATION=22
-BETWEEN=23
-BOTH=24
-BUCKET=25
-BUCKETS=26
-BY=27
-CACHE=28
-CASCADE=29
-CASE=30
-CAST=31
-CATALOG=32
-CATALOGS=33
-CHANGE=34
-CHECK=35
-CLEAR=36
-CLUSTER=37
-CLUSTERED=38
-CODEGEN=39
-COLLATE=40
-COLLECTION=41
-COLUMN=42
-COLUMNS=43
-COMMENT=44
-COMMIT=45
-COMPACT=46
-COMPACTIONS=47
-COMPUTE=48
-CONCATENATE=49
-CONSTRAINT=50
-COST=51
-CREATE=52
-CROSS=53
-CUBE=54
-CURRENT=55
-CURRENT_DATE=56
-CURRENT_TIME=57
-CURRENT_TIMESTAMP=58
-CURRENT_USER=59
-DAY=60
-DAYS=61
-DAYOFYEAR=62
-DATA=63
-DATABASE=64
-DATABASES=65
-DATEADD=66
-DATEDIFF=67
-DBPROPERTIES=68
-DEFAULT=69
-DEFINED=70
-DELETE=71
-DELIMITED=72
-DESC=73
-DESCRIBE=74
-DFS=75
-DIRECTORIES=76
-DIRECTORY=77
-DISTINCT=78
-DISTRIBUTE=79
-DIV=80
-DROP=81
-ELSE=82
-END=83
-ESCAPE=84
-ESCAPED=85
-EXCEPT=86
-EXCHANGE=87
-EXCLUDE=88
-EXISTS=89
-EXPLAIN=90
-EXPORT=91
-EXTENDED=92
-EXTERNAL=93
-EXTRACT=94
-FALSE=95
-FETCH=96
-FIELDS=97
-FILTER=98
-FILEFORMAT=99
-FIRST=100
-FOLLOWING=101
-FOR=102
-FOREIGN=103
-FORMAT=104
-FORMATTED=105
-FROM=106
-FULL=107
-FUNCTION=108
-FUNCTIONS=109
-GLOBAL=110
-GRANT=111
-GROUP=112
-GROUPING=113
-HAVING=114
-HOUR=115
-HOURS=116
-IF=117
-IGNORE=118
-IMPORT=119
-IN=120
-INCLUDE=121
-INDEX=122
-INDEXES=123
-INNER=124
-INPATH=125
-INPUTFORMAT=126
-INSERT=127
-INTERSECT=128
-INTERVAL=129
-INTO=130
-IS=131
-ITEMS=132
-JOIN=133
-KEYS=134
-LAST=135
-LATERAL=136
-LAZY=137
-LEADING=138
-LEFT=139
-LIKE=140
-ILIKE=141
-LIMIT=142
-LINES=143
-LIST=144
-LOAD=145
-LOCAL=146
-LOCATION=147
-LOCK=148
-LOCKS=149
-LOGICAL=150
-MACRO=151
-MAP=152
-MATCHED=153
-MERGE=154
-MICROSECOND=155
-MICROSECONDS=156
-MILLISECOND=157
-MILLISECONDS=158
-MINUTE=159
-MINUTES=160
-MONTH=161
-MONTHS=162
-MSCK=163
-NAMESPACE=164
-NAMESPACES=165
-NANOSECOND=166
-NANOSECONDS=167
-NATURAL=168
-NO=169
-NOT=170
-NULL=171
-NULLS=172
-OF=173
-OFFSET=174
-ON=175
-ONLY=176
-OPTION=177
-OPTIONS=178
-OR=179
-ORDER=180
-OUT=181
-OUTER=182
-OUTPUTFORMAT=183
-OVER=184
-OVERLAPS=185
-OVERLAY=186
-OVERWRITE=187
-PARTITION=188
-PARTITIONED=189
-PARTITIONS=190
-PERCENTILE_CONT=191
-PERCENTILE_DISC=192
-PERCENTLIT=193
-PIVOT=194
-PLACING=195
-POSITION=196
-PRECEDING=197
-PRIMARY=198
-PRINCIPALS=199
-PROPERTIES=200
-PURGE=201
-QUARTER=202
-QUERY=203
-RANGE=204
-RECORDREADER=205
-RECORDWRITER=206
-RECOVER=207
-REDUCE=208
-REFERENCES=209
-REFRESH=210
-RENAME=211
-REPAIR=212
-REPEATABLE=213
-REPLACE=214
-RESET=215
-RESPECT=216
-RESTRICT=217
-REVOKE=218
-RIGHT=219
-RLIKE=220
-ROLE=221
-ROLES=222
-ROLLBACK=223
-ROLLUP=224
-ROW=225
-ROWS=226
-SECOND=227
-SECONDS=228
-SCHEMA=229
-SCHEMAS=230
-SELECT=231
-SEMI=232
-SEPARATED=233
-SERDE=234
-SERDEPROPERTIES=235
-SESSION_USER=236
-SET=237
-SETMINUS=238
-SETS=239
-SHOW=240
-SKEWED=241
-SOME=242
-SORT=243
-SORTED=244
-SOURCE=245
-START=246
-STATISTICS=247
-STORED=248
-STRATIFY=249
-STRUCT=250
-SUBSTR=251
-SUBSTRING=252
-SYNC=253
-SYSTEM_TIME=254
-SYSTEM_VERSION=255
-TABLE=256
-TABLES=257
-TABLESAMPLE=258
-TARGET=259
-TBLPROPERTIES=260
-TEMPORARY=261
-TERMINATED=262
-THEN=263
-TIME=264
-TIMESTAMP=265
-TIMESTAMPADD=266
-TIMESTAMPDIFF=267
-TO=268
-TOUCH=269
-TRAILING=270
-TRANSACTION=271
-TRANSACTIONS=272
-TRANSFORM=273
-TRIM=274
-TRUE=275
-TRUNCATE=276
-TRY_CAST=277
-TYPE=278
-UNARCHIVE=279
-UNBOUNDED=280
-UNCACHE=281
-UNION=282
-UNIQUE=283
-UNKNOWN=284
-UNLOCK=285
-UNPIVOT=286
-UNSET=287
-UPDATE=288
-USE=289
-USER=290
-USING=291
-VALUES=292
-VERSION=293
-VIEW=294
-VIEWS=295
-WEEK=296
-WEEKS=297
-WHEN=298
-WHERE=299
-WINDOW=300
-WITH=301
-WITHIN=302
-YEAR=303
-YEARS=304
-ZONE=305
-EQ=306
-NSEQ=307
-NEQ=308
-NEQJ=309
-LT=310
-LTE=311
-GT=312
-GTE=313
-PLUS=314
-MINUS=315
-ASTERISK=316
-SLASH=317
-PERCENT=318
-TILDE=319
-AMPERSAND=320
-PIPE=321
-CONCAT_PIPE=322
-HAT=323
-COLON=324
-ARROW=325
-HENT_START=326
-HENT_END=327
-STRING=328
-DOUBLEQUOTED_STRING=329
-BIGINT_LITERAL=330
-SMALLINT_LITERAL=331
-TINYINT_LITERAL=332
-INTEGER_VALUE=333
-EXPONENT_VALUE=334
-DECIMAL_VALUE=335
-FLOAT_LITERAL=336
-DOUBLE_LITERAL=337
-BIGDECIMAL_LITERAL=338
-IDENTIFIER=339
-BACKQUOTED_IDENTIFIER=340
-SIMPLE_COMMENT=341
-BRACKETED_COMMENT=342
-WS=343
-UNRECOGNIZED=344
+ALWAYS=12

Review Comment:
   This file auto regenerated, I'm not sure should revert it or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369777027


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -36,8 +39,7 @@ import org.apache.spark.util.BoundedPriorityQueue
  * We have to store all the collected elements in memory, and so notice that too many elements
  * can cause GC paused and eventually OutOfMemory Errors.
  */
-abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T]
-  with UnaryLike[Expression] {
+abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] {

Review Comment:
   Change `Collect` is looks good now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369609547


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   > You can try to not define bufferElementType.
   
   These are some problems I should to solve.
   1. If I use `CollectList` in `ListAgg`, I should redefine `bufferElementType` in `CollectList` so that we can save two fields value each `InternalRow`. (Seem like can not do that at now)
   2. If not define `bufferElementType`, seem like I can not use `CollectList` in `ListAgg`. I shouldn't `extend Collect`, so that I can do it without `BinaryLike` and `UnaryLike` problem.
   3. The plan for now, I removed `UnaryLike` of  `Collect` so that I can use `BinaryLike` and `extend Collect` in `ListAgg` at the same time.
   
   So which way should I continue?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370049863


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("listagg function") {
+    // normal case
+    val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("listagg(a)", "listagg(b)"),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+    checkAnswer(
+      df.select(listagg($"a"), listagg($"b")),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+
+    // distinct case
+    val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+    checkAnswer(
+      df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+      Seq(Row("a,b", "b,d"))
+    )
+
+    // null case
+    val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c")
+    checkAnswer(
+      df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"),
+        listagg($"c")),
+      Seq(Row("a", "a,a", "b", "b,b", ""))
+    )
+
+    // custom delimiter
+    val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+      Seq(Row("a|b|c", "b|c|d"))
+    )
+  }

Review Comment:
   also add new one https://github.com/apache/spark/pull/42398/commits/87999f4c98799c5365efcb0cf0e83fd8f0aa3dbc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369921181


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      val sorted = if (sameExpression) {
+        if (reverse) {
+          buffer.toSeq.sorted(ordering.reverse)
+        } else {
+          buffer.toSeq.sorted(ordering)
+        }
+      } else {
+        if (reverse) {
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+            child.dataType))
+        } else {
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+            child.dataType))
+        }
+      }
+      UTF8String.fromString(sorted.map(_.toString)
+        .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+    } else {
+      UTF8String.fromString("")
+    }
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val v = if (sameExpression) {
+        convertToBufferElement(value)
+      } else {
+        InternalRow.apply(convertToBufferElement(value),
+          convertToBufferElement(orderExpression.eval(input)))
+      }
+      buffer += v
+    }
+    buffer
+  }
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int) : ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression

Review Comment:
   Please put these functions forward: `createAggregationBuffer`, `nullable`, `dataType`, `left` and `right`.
   You can reference other function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1288951240


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -884,6 +884,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
       messageParameters = Map("database" -> quoted))
   }
 
+  def functionAndOrderExpressionMismatchError(
+      functionName: String,
+      functionExpr: Expression,
+      orderExpr: Expression): Throwable = {
+    new AnalysisException(
+      errorClass = "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH_ERROR",
+      messageParameters = Map(
+        "functionName" -> functionName,

Review Comment:
   If it is an id, quote it by `toSQLId`, if it is a SQL statement (or a part of it), use `toSQLStmt`



##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -921,6 +921,11 @@
     ],
     "sqlState" : "42809"
   },
+  "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH_ERROR" : {

Review Comment:
   All error classes are errors, the suffix `_ERROR` doesn't give any additional info. Could you remove it, please:
   ```suggestion
     "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH" : {
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2203,6 +2203,38 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
     }
   }
 
+  /**
+   * Create a ListAgg expression.
+   */
+  override def visitListAgg(ctx: ListAggContext): AnyRef = {
+    val column = expression(ctx.aggEpxr)
+    val sortOrder = visitSortItem(ctx.sortItem)
+    if (!column.semanticEquals(sortOrder.child)) {
+      throw QueryCompilationErrors.functionAndOrderExpressionMismatchError("list_agg", column,

Review Comment:
   Why `list_agg` but not `listagg`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "holdenk (via GitHub)" <gi...@apache.org>.
holdenk commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1351071287


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   So the idea with that is to bubble up the distinct to avoid a shuffle?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1982086020

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349818074


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);

Review Comment:
   Could you add an example with duplicate values? With DISTINCT or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "hopefulnick (via GitHub)" <gi...@apache.org>.
hopefulnick commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1387915950


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2214,6 +2214,30 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
     }
   }
 
+  /**
+   * Create a ListAgg expression.
+   */
+  override def visitListAgg(ctx: ListAggContext): AnyRef = {
+    val column = expression(ctx.aggEpxr)
+    val sortOrder = visitSortItem(ctx.sortItem)
+    val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
+    if (!column.semanticEquals(sortOrder.child) && isDistinct) {
+      throw QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", column,
+        sortOrder.child)
+    }
+    val delimiter = if (ctx.delimiter != null) Literal(ctx.delimiter.getText) else Literal(",")

Review Comment:
   ctx.delimiter.getText includes quotes, like "','"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "hopefulnick (via GitHub)" <gi...@apache.org>.
hopefulnick commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1387425067


##########
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##########
@@ -158,6 +158,69 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     }
   }
 
+  test("SPARK-42746: listagg function") {
+    withTempView("df", "df2") {
+      Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b")
+        .createOrReplaceTempView("df")
+      checkAnswer(
+        sql("select listagg(b) from df group by a"),
+        Row("") :: Row("b,c") :: Row("c,d") :: Nil)
+
+      checkAnswer(
+        sql("select listagg(b) from df where 1 != 1"),
+        Row("") :: Nil)
+
+      checkAnswer(
+        sql("select listagg(b, '|') from df group by a"),
+        Row("b|c") :: Row("c|d") :: Row("") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+        Row("a,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+        Row("b,b,a,a") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+          "OVER (PARTITION BY b) FROM df"),
+        Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row("") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b DESC) FROM df"),

Review Comment:
   when specifying the custom seperator, like ',', it will get "b','a','b,','a", not the expected result "b,a,b,a"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1371068875


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      lazy val sortFunc = (sameExpression, reverse) match {
+        case (true, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.sorted(ordering.reverse)
+        case (true, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.sorted(ordering)
+        case (false, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+            child.dataType))
+        case (false, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+            child.dataType))
+      }
+      val sorted = sortFunc(buffer)
+      UTF8String.fromString(sorted.map(_.toString)
+        .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+    } else {
+      UTF8String.fromString("")
+    }
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val v = if (sameExpression) {
+        convertToBufferElement(value)
+      } else {
+        InternalRow.apply(convertToBufferElement(value),
+          convertToBufferElement(orderExpression.eval(input)))

Review Comment:
   cc @cloud-fan @MaxGekk 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1371026410


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      lazy val sortFunc = (sameExpression, reverse) match {
+        case (true, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.sorted(ordering.reverse)
+        case (true, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.sorted(ordering)
+        case (false, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+            child.dataType))
+        case (false, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+            child.dataType))
+      }
+      val sorted = sortFunc(buffer)
+      UTF8String.fromString(sorted.map(_.toString)
+        .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+    } else {
+      UTF8String.fromString("")
+    }
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val v = if (sameExpression) {
+        convertToBufferElement(value)
+      } else {
+        InternalRow.apply(convertToBufferElement(value),
+          convertToBufferElement(orderExpression.eval(input)))

Review Comment:
   This one can not be changed, we must save value into buffer after execute `InternalRow.copyValue`. Please refer https://github.com/apache/spark/pull/42398#discussion_r1349839317 . Without this, the value in buffer alway have same value of last one row.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      lazy val sortFunc = (sameExpression, reverse) match {

Review Comment:
   addressed all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on pull request #42398: [SPARK-42746][SQL] Add the LIST_AGG() aggregate function

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1669812879

   cc @MaxGekk @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1713990204

   Hi @MaxGekk , could you continue review this PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349828105


##########
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##########
@@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     }
   }
 
+  test("SPARK-42746: listagg function") {
+    withTempView("df", "df2") {
+      Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b")
+        .createOrReplaceTempView("df")
+      checkAnswer(
+        sql("select listagg(b) from df group by a"),
+        Row("b,c") :: Row("c,d") :: Row(null) :: Nil)
+
+      checkAnswer(
+        sql("select listagg(b, '|') from df group by a"),
+        Row("b|c") :: Row("c|d") :: Row(null) :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+        Row("a,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+        Row("b,b,a,a") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+          "OVER (PARTITION BY b) FROM df"),
+        Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil)
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df")

Review Comment:
   But this test case without `DISTINCT` keyword.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369579705


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I see. You can try to not define `bufferElementType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369925570


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2214,6 +2214,40 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
     }
   }
 
+  /**
+   * Create a ListAgg expression.
+   */
+  override def visitListAgg(ctx: ListAggContext): AnyRef = {
+    val column = expression(ctx.aggEpxr)
+    val sortOrder = visitSortItem(ctx.sortItem)
+    val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
+    if (!column.semanticEquals(sortOrder.child) && isDistinct) {
+      throw QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", column,
+        sortOrder.child)
+    }
+    val listAgg = if (ctx.delimiter != null) {
+      sortOrder.direction match {
+        case Ascending => ListAgg(column, Literal(ctx.delimiter.getText), sortOrder.child,
+          false)
+        case Descending => ListAgg(column, Literal(ctx.delimiter.getText), sortOrder.child,
+          true)
+      }
+    } else {
+      sortOrder.direction match {
+        case Ascending => ListAgg(column, Literal(","), sortOrder.child, false)
+        case Descending => ListAgg(column, Literal(","), sortOrder.child, true)
+      }
+    }

Review Comment:
   ```
   val delimiter = ...
   val reverse = ...
   val listAgg = ListAgg(column, delimiter, sortOrder.child, reverse)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370041468


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("listagg function") {
+    // normal case
+    val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("listagg(a)", "listagg(b)"),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+    checkAnswer(
+      df.select(listagg($"a"), listagg($"b")),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+
+    // distinct case
+    val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+    checkAnswer(
+      df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+      Seq(Row("a,b", "b,d"))
+    )
+
+    // null case
+    val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c")
+    checkAnswer(
+      df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"),
+        listagg($"c")),
+      Seq(Row("a", "a,a", "b", "b,b", ""))
+    )
+
+    // custom delimiter
+    val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+      Seq(Row("a|b|c", "b|c|d"))
+    )
+  }

Review Comment:
   Is there have the test case with different input column and sort column?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1752261690

   > @Hisoka-X Is `LISTAGG` an ANSI standard?
   
   Yes, refer https://modern-sql.com/feature/listagg


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349836810


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],
+      input: InternalRow): OpenHashMap[AnyRef, Long] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val key = InternalRow.copyValue(value)

Review Comment:
   The key cached here and not related to `InternalRow`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349833053


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],
+      input: InternalRow): OpenHashMap[AnyRef, Long] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val key = InternalRow.copyValue(value)

Review Comment:
   The value of InternalRow will be rewrite. So we must copy it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1405287835


##########
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##########
@@ -158,6 +158,69 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     }
   }
 
+  test("SPARK-42746: listagg function") {
+    withTempView("df", "df2") {
+      Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b")
+        .createOrReplaceTempView("df")
+      checkAnswer(
+        sql("select listagg(b) from df group by a"),
+        Row("") :: Row("b,c") :: Row("c,d") :: Nil)
+
+      checkAnswer(
+        sql("select listagg(b) from df where 1 != 1"),
+        Row("") :: Nil)
+
+      checkAnswer(
+        sql("select listagg(b, '|') from df group by a"),
+        Row("b|c") :: Row("c|d") :: Row("") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+        Row("a,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+        Row("b,b,a,a") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+          "OVER (PARTITION BY b) FROM df"),
+        Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row("") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b DESC) FROM df"),

Review Comment:
   Thanks @hopefulnick , sorry for late response, let me fix it today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349828939


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Review Comment:
   After a second consider, can we reuse the `CollectList` that is an existent aggregate function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349817389


##########
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##########
@@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     }
   }
 
+  test("SPARK-42746: listagg function") {
+    withTempView("df", "df2") {
+      Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b")
+        .createOrReplaceTempView("df")
+      checkAnswer(
+        sql("select listagg(b) from df group by a"),
+        Row("b,c") :: Row("c,d") :: Row(null) :: Nil)
+
+      checkAnswer(
+        sql("select listagg(b, '|') from df group by a"),
+        Row("b|c") :: Row("c|d") :: Row(null) :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+        Row("a,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+        Row("b,b,a,a") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+          "OVER (PARTITION BY b) FROM df"),
+        Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil)
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df")

Review Comment:
   Is the behavior correct? According to https://docs.snowflake.com/en/sql-reference/functions/listagg, the mismatch issues if you specify different columns for `DISTINCT` and `WITHIN GROUP`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);

Review Comment:
   Could you add an example with duplicate values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370043175


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("listagg function") {
+    // normal case
+    val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("listagg(a)", "listagg(b)"),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+    checkAnswer(
+      df.select(listagg($"a"), listagg($"b")),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+
+    // distinct case
+    val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+    checkAnswer(
+      df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+      Seq(Row("a,b", "b,d"))
+    )
+
+    // null case
+    val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c")
+    checkAnswer(
+      df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"),
+        listagg($"c")),
+      Seq(Row("a", "a,a", "b", "b,b", ""))
+    )
+
+    // custom delimiter
+    val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+      Seq(Row("a|b|c", "b|c|d"))
+    )
+  }

Review Comment:
   here https://github.com/apache/spark/pull/42398/files#diff-7de447aae84ff6752e962b088c3f62e552db3b58a4be52b69ea65da82cba6c27R198-R200



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349825602


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "holdenk (via GitHub)" <gi...@apache.org>.
holdenk commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1351070023


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Review Comment:
   Similarily lets put this in collect.scala with the other collect friends.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1351167115


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   I think this issue has been avoided if we refactor with `CollectList` or `extends Collect`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368090659


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   Thanks @beliefer , I did some updated. Please check again. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369459076


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   It's a bit difficult for me to understand. The values of the two expressions should be bound before sorting. I don't quite understand the meaning of separate definitions and how to collect the corresponding values separately. Unless two `CollectList` are defined in `ListAgg`. Could you provide some demo? Please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369777027


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -36,8 +39,7 @@ import org.apache.spark.util.BoundedPriorityQueue
  * We have to store all the collected elements in memory, and so notice that too many elements
  * can cause GC paused and eventually OutOfMemory Errors.
  */
-abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T]
-  with UnaryLike[Expression] {
+abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] {

Review Comment:
   Change Collect is looks good now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370056246


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      val sorted = if (sameExpression) {
+        if (reverse) {
+          buffer.toSeq.sorted(ordering.reverse)
+        } else {
+          buffer.toSeq.sorted(ordering)
+        }
+      } else {
+        if (reverse) {
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+            child.dataType))
+        } else {
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+            child.dataType))
+        }
+      }
+      UTF8String.fromString(sorted.map(_.toString)

Review Comment:
   You can define a sort function like
   
   ```
   private lazy val sortFunc = (sameExpression, reverse) match {
     case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => buffer.toSeq.sorted(ordering.reverse)
     ...
   }
   ```
   
   and call the function here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1287909249


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -1148,6 +1148,60 @@ object functions {
    */
   def sum_distinct(e: Column): Column = withAggregateFunction(Sum(e.expr), isDistinct = true)
 
+  /**
+   * Aggregate function: returns the concatenated input values.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg(e: Column): Column = withAggregateFunction {
+    ListAgg(e.expr)
+  }
+
+  /**
+   * Aggregate function: returns the concatenated input values.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg(columnName: String): Column = listagg(Column(columnName))
+
+  /**
+   * Aggregate function: returns the concatenated input values, separated by the delimiter string.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg(e: Column, delimiter: String): Column = withAggregateFunction {

Review Comment:
   Expose `Column` signature only (see the top of this file)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1359157402


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   You can composite them. Please refer `PercentileCont` or `RegrSlope`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369774164


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I'm sorry. I get it now. `ListAgg` need save two fields. If so, please go on the path and push it forward.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369844354


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   > ListAgg need save two fields.
   
   Yes, thanks for your reply patiently. @beliefer 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368419550


##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -988,6 +988,8 @@ primaryExpression
     | name=(PERCENTILE_CONT | PERCENTILE_DISC) LEFT_PAREN percentage=valueExpression RIGHT_PAREN
         WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN
         (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? ( OVER windowSpec)?     #percentile
+    | LISTAGG LEFT_PAREN setQuantifier? aggEpxr=expression (COMMA delimiter=stringLit)? RIGHT_PAREN
+        WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN (OVER windowSpec)?              #listAgg

Review Comment:
   ```suggestion
           WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN (OVER windowSpec)?               #listAgg
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   Proxy mode can also be used.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I think we should avoid change `Collect`. We can declare a field of `CollectList` and `ListAgg` extended `BinaryLike[Expression]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42398:
URL: https://github.com/apache/spark/pull/42398#issuecomment-1752242004

   @Hisoka-X Is `LISTAGG` an ANSI standard?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349829226


##########
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##########
@@ -158,6 +158,57 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     }
   }
 
+  test("SPARK-42746: listagg function") {
+    withTempView("df", "df2") {
+      Seq(("a", "b"), ("a", "c"), ("b", "c"), ("b", "d"), (null, null)).toDF("a", "b")
+        .createOrReplaceTempView("df")
+      checkAnswer(
+        sql("select listagg(b) from df group by a"),
+        Row("b,c") :: Row("c,d") :: Row(null) :: Nil)
+
+      checkAnswer(
+        sql("select listagg(b, '|') from df group by a"),
+        Row("b|c") :: Row("c|d") :: Row(null) :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(DISTINCT a) FROM df"),
+        Row("a,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a) FROM df"),
+        Row("a,a,b,b") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) FROM df"),
+        Row("b,b,a,a") :: Nil)
+
+      checkAnswer(
+        sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY a DESC) " +
+          "OVER (PARTITION BY b) FROM df"),
+        Row("a") :: Row("b,a") :: Row("b,a") :: Row("b") :: Row(null) :: Nil)
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT LISTAGG(a) WITHIN GROUP (ORDER BY b) FROM df")

Review Comment:
   oh, this is a mistake, let me fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349832245


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   It seems we only need set or list here.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],
+      input: InternalRow): OpenHashMap[AnyRef, Long] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val key = InternalRow.copyValue(value)

Review Comment:
   Why not use value as key?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1405586741


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2214,6 +2214,30 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
     }
   }
 
+  /**
+   * Create a ListAgg expression.
+   */
+  override def visitListAgg(ctx: ListAggContext): AnyRef = {
+    val column = expression(ctx.aggEpxr)
+    val sortOrder = visitSortItem(ctx.sortItem)
+    val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
+    if (!column.semanticEquals(sortOrder.child) && isDistinct) {
+      throw QueryCompilationErrors.functionAndOrderExpressionMismatchError("LISTAGG", column,
+        sortOrder.child)
+    }
+    val delimiter = if (ctx.delimiter != null) Literal(ctx.delimiter.getText) else Literal(",")

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1289432034


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2203,6 +2203,38 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
     }
   }
 
+  /**
+   * Create a ListAgg expression.
+   */
+  override def visitListAgg(ctx: ListAggContext): AnyRef = {
+    val column = expression(ctx.aggEpxr)
+    val sortOrder = visitSortItem(ctx.sortItem)
+    if (!column.semanticEquals(sortOrder.child)) {
+      throw QueryCompilationErrors.functionAndOrderExpressionMismatchError("list_agg", column,

Review Comment:
   Fixed all! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1287925774


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -1148,6 +1148,60 @@ object functions {
    */
   def sum_distinct(e: Column): Column = withAggregateFunction(Sum(e.expr), isDistinct = true)
 
+  /**
+   * Aggregate function: returns the concatenated input values.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg(e: Column): Column = withAggregateFunction {
+    ListAgg(e.expr)
+  }
+
+  /**
+   * Aggregate function: returns the concatenated input values.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg(columnName: String): Column = listagg(Column(columnName))
+
+  /**
+   * Aggregate function: returns the concatenated input values, separated by the delimiter string.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg(e: Column, delimiter: String): Column = withAggregateFunction {

Review Comment:
   Done, thanks for remind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349881268


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Review Comment:
   `ListAgg` stores value too. I think reuse `CollectList` or `extends Collect` is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349837194


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   So use List if without `DISTINCT` and use Set if `DISTINCT` specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349836243


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(

Review Comment:
   It would be worked. But seem like `CollectList` will use more memory because it store value not the number of times? Should I do that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369452950


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   You can declare another type field in `ListAgg`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370071343


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      val sorted = if (sameExpression) {
+        if (reverse) {
+          buffer.toSeq.sorted(ordering.reverse)
+        } else {
+          buffer.toSeq.sorted(ordering)
+        }
+      } else {
+        if (reverse) {
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+            child.dataType))
+        } else {
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+            child.dataType))
+        }
+      }
+      UTF8String.fromString(sorted.map(_.toString)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369484714


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   ```
   ListAgg (...) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]] with BinaryLike[Expression] {
     private val collectList = CollectList()
     ...
     override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = {
       collectList.update(...)
     }
   
     override def merge(buffer: ArrayBuffer[Any],, other: Any): Any = {
       collectList.merge(...)
     }
   
     override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
       val ordering = PhysicalDataType.ordering(orderExpr.dataType)
       val sorted = // sort logic here
      
       collectList.eval(sorted)
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369980996


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),

Review Comment:
   addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369493701


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   Thanks @beliefer . But seem like it doesn't solve the problem of save another order field value. PS: you can check the code I commented, almost like you shared. https://github.com/apache/spark/pull/42398#discussion_r1368586674



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370996202


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala:
##########
@@ -478,6 +478,7 @@ object FunctionRegistry {
     expression[Percentile]("percentile"),
     expression[Median]("median"),
     expression[Skewness]("skewness"),
+    expression[ListAgg]("listagg"),

Review Comment:
   Because `ListAgg` extends `Collect` now, please put it with `CollectList` together.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      lazy val sortFunc = (sameExpression, reverse) match {
+        case (true, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.sorted(ordering.reverse)
+        case (true, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.sorted(ordering)
+        case (false, true) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
+            child.dataType))
+        case (false, false) => (buffer: mutable.ArrayBuffer[Any]) =>
+          buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1,
+            orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
+            child.dataType))
+      }
+      val sorted = sortFunc(buffer)
+      UTF8String.fromString(sorted.map(_.toString)
+        .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
+    } else {
+      UTF8String.fromString("")
+    }
+  }
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val v = if (sameExpression) {
+        convertToBufferElement(value)
+      } else {
+        InternalRow.apply(convertToBufferElement(value),
+          convertToBufferElement(orderExpression.eval(input)))

Review Comment:
   `InternalRow.apply(value, orderExpression.eval(input))`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(
+      newMutableAggBufferOffset: Int): ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    if (sameExpression) {
+      child.dataType
+    } else {
+      StructType(Seq(
+        StructField("value", child.dataType),
+        StructField("sortOrder", orderExpression.dataType)))
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      lazy val sortFunc = (sameExpression, reverse) match {

Review Comment:
   Please put `sortFunc` out of `eval`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,115 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression,
+    orderExpression: Expression,

Review Comment:
   ```suggestion
       orderExpression: Expression,
       delimiter: Expression,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1287909122


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +

Review Comment:
   Should also probably register at `FunctionRegistry`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368607666


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   Why need change `bufferElementType`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
+  override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
+
+  override protected lazy val bufferElementType: DataType = {
+    StructType(Seq(
+      StructField("value", child.dataType),
+      StructField("sortOrder", orderExpression.dataType)))
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val ordering = PhysicalDataType.ordering(orderExpression.dataType)
+      val sorted = if (reverse) {
+        buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,

Review Comment:
   I think you just save the temp data into the `CollectList`'s buffer.
   Then you sort the `CollectList`'s buffer here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369943722


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -920,6 +920,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
         "fieldNames" -> v1Table.schema.fieldNames.mkString(", ")))
   }
 
+  def functionAndOrderExpressionMismatchError(
+      functionName: String,
+      functionExpr: Expression,
+      orderExpr: Expression): Throwable = {
+    new AnalysisException(
+      errorClass = "FUNCTION_AND_ORDER_EXPRESSION_MISMATCH",
+      messageParameters = Map(
+        "functionName" -> toSQLStmt(functionName),

Review Comment:
   ```suggestion
           "functionName" -> toSQLId(functionName),
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("listagg function") {
+    // normal case
+    val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("listagg(a)", "listagg(b)"),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+    checkAnswer(
+      df.select(listagg($"a"), listagg($"b")),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+
+    // distinct case
+    val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+    checkAnswer(
+      df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+      Seq(Row("a,b", "b,d"))
+    )
+
+    // null case
+    val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c")
+    checkAnswer(
+      df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"),
+        listagg($"c")),
+      Seq(Row("a", "a,a", "b", "b,b", ""))
+    )
+
+    // custom delimiter
+    val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+      Seq(Row("a|b|c", "b|c|d"))
+    )
+  }

Review Comment:
   Please test the empty input.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),

Review Comment:
   Do we really need the default value?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,117 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with BinaryLike[Expression] {
+
+  def this(child: Expression) =
+    this(child, Literal.create(",", StringType), child, false, 0, 0)
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, child, false, 0, 0)
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = StringType
+
+  override def left: Expression = child
+
+  override def right: Expression = orderExpression
+
+  private lazy val sameExpression = orderExpression.semanticEquals(child)
+

Review Comment:
   Please put `createAggregationBuffer` here and move `withNewMutableAggBufferOffset`, `withNewInputAggBufferOffset` together with `createAggregationBuffer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1368586674


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -245,3 +249,98 @@ case class CollectTopK(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK =
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       ""
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    orderExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]

Review Comment:
   I had tried with that. It seem like can't change bufferElementType in `CollectList` but `ListAgg` want to save two expression result. And this way will rewrite some method in `CollectList` like `eval`, `update`, so I didn't find the value of inheriting `CollectList`, it seems that it is only same with generic classes? Please point out my mistake. Thanks.
   
   These are code.
   <details>
   <summary>ListAgg</summary>
   
   ```scala
   case class ListAgg(
       child: Expression,
       delimiter: Expression = Literal.create(",", StringType),
       orderExpression: Expression,
       reverse: Boolean = false,
       mutableAggBufferOffset: Int = 0,
       inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]]
     with BinaryLike[Expression] {
   
     def this(child: Expression) =
       this(child, Literal.create(",", StringType), child, false, 0, 0)
     def this(child: Expression, delimiter: Expression) =
       this(child, delimiter, child, false, 0, 0)
   
     private lazy val collect = CollectList(child, mutableAggBufferOffset, inputAggBufferOffset)
   
   //  override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value)
     override def defaultResult: Option[Literal] = Option(Literal.create("", StringType))
     
   // TODO seem like hard to change bufferElementType in collect
   //  override protected lazy val bufferElementType: DataType = {
   //    StructType(Seq(
   //      StructField("value", child.dataType),
   //      StructField("sortOrder", orderExpression.dataType)))
   //  }
   
     override def merge(buffer: ArrayBuffer[Any], input: ArrayBuffer[Any]): ArrayBuffer[Any] = {
       collect.merge(buffer, input)
     }
   
     override def serialize(buffer: ArrayBuffer[Any]): Array[Byte] = {
       collect.serialize(buffer)
     }
   
     override def deserialize(storageFormat: Array[Byte]): ArrayBuffer[Any] = {
       collect.deserialize(storageFormat)
     }
   
     override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
       if (buffer.nonEmpty) {
         val ordering = PhysicalDataType.ordering(orderExpression.dataType)
         val sorted = if (reverse) {
           buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
             orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0,
             child.dataType))
         } else {
           buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].toSeq.sortBy(_.get(1,
             orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0,
             child.dataType))
         }
         UTF8String.fromString(sorted.map(_.toString)
           .mkString(delimiter.eval().asInstanceOf[UTF8String].toString))
       } else {
         UTF8String.fromString("")
       }
     }
   
     override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = {
       val value = child.eval(input)
       if (value != null) {
         buffer += InternalRow.apply(collect.convertToBufferElement(value),
           collect.convertToBufferElement(orderExpression.eval(input)))
       }
       buffer
     }
   
     override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty
   
     override def withNewMutableAggBufferOffset(
         newMutableAggBufferOffset: Int) : ImperativeAggregate =
       copy(mutableAggBufferOffset = newMutableAggBufferOffset)
   
     override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
       copy(inputAggBufferOffset = newInputAggBufferOffset)
   
     override def nullable: Boolean = false
   
     override def dataType: DataType = StringType
   
     override def left: Expression = child
   
     override def right: Expression = orderExpression
   
     override protected def withNewChildrenInternal(
         newLeft: Expression,
         newRight: Expression): Expression = {
       copy(child = newLeft, orderExpression = newRight)
     }
   }
   ```
   
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1369978917


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("listagg function") {
+    // normal case
+    val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("listagg(a)", "listagg(b)"),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+    checkAnswer(
+      df.select(listagg($"a"), listagg($"b")),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+
+    // distinct case
+    val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+    checkAnswer(
+      df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+      Seq(Row("a,b", "b,d"))
+    )
+
+    // null case
+    val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c")
+    checkAnswer(
+      df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"),
+        listagg($"c")),
+      Seq(Row("a", "a,a", "b", "b,b", ""))
+    )
+
+    // custom delimiter
+    val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+      Seq(Row("a|b|c", "b|c|d"))
+    )
+  }

Review Comment:
   Can this prove?
   https://github.com/apache/spark/pull/42398/files#diff-7de447aae84ff6752e962b088c3f62e552db3b58a4be52b69ea65da82cba6c27R169-R171



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1370041468


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -603,6 +603,41 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("listagg function") {
+    // normal case
+    val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("listagg(a)", "listagg(b)"),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+    checkAnswer(
+      df.select(listagg($"a"), listagg($"b")),
+      Seq(Row("a,b,c", "b,c,d"))
+    )
+
+    // distinct case
+    val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+    checkAnswer(
+      df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+      Seq(Row("a,b", "b,d"))
+    )
+
+    // null case
+    val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c")
+    checkAnswer(
+      df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"),
+        listagg($"c")),
+      Seq(Row("a", "a,a", "b", "b,b", ""))
+    )
+
+    // custom delimiter
+    val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+      Seq(Row("a|b|c", "b|c|d"))
+    )
+  }

Review Comment:
   Is there have the test case with different input column and sort column?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349810729


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)

Review Comment:
   It used for sql expression, it will be invoked when call from sql like `listagg(b)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349833814


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   Set or List can not store duplicate value. It to make sure invoke it work right without `DISTINCT` expression. Eg `a` and `a` should be `a,a` not `a`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1349839317


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],
+      input: InternalRow): OpenHashMap[AnyRef, Long] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val key = InternalRow.copyValue(value)

Review Comment:
   It could be `UTF8String`, without this, the Map key will be changed after put key into map. Other place also use this way like `CollectList`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #42398:
URL: https://github.com/apache/spark/pull/42398#discussion_r1355031438


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ListAgg.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.OpenHashMap
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns the concatenated input values," +
+    " separated by the delimiter string.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a,b,c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       a,a
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col);
+       a,b
+      > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col);
+       a|b
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal.create(",", StringType),
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
+  with UnaryLike[Expression] {
+
+  def this(child: Expression) = this(child, Literal.create(",", StringType), false, 0, 0)
+  def this(child: Expression, delimiter: Expression) = this(child, delimiter, false, 0, 0)
+
+  override def update(
+      buffer: OpenHashMap[AnyRef, Long],

Review Comment:
   Hi @holdenk @beliefer , I did some update for `extends Collect`. But there are some problems that bother me. ListAgg needs to save two expressions separately because the expressions used for aggregation and sorting may be different. However, Collect is an implementation of `UnaryLike` that can only have one child, resulting in another expression that cannot be parsed. Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function
URL: https://github.com/apache/spark/pull/42398


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org