You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "nikolamand-db (via GitHub)" <gi...@apache.org> on 2024/03/20 09:59:01 UTC

[PR] [SPARK-47483] Add support for aggregation and join operations on arrays of collated strings [spark]

nikolamand-db opened a new pull request, #45611:
URL: https://github.com/apache/spark/pull/45611

   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   Example of aggregation sequence:
   ```
   create table t(a array<string collate utf8_binary_lcase>) using parquet;
   
   insert into t(a) values(array('a' collate utf8_binary_lcase));
   insert into t(a) values(array('A' collate utf8_binary_lcase));
   
   select distinct a from t; 
   ```
   Example of join sequence:
   ```
   create table l(a array<string collate utf8_binary_lcase>) using parquet;
   create table r(a array<string collate utf8_binary_lcase>) using parquet;
   
   insert into l(a) values(array('a' collate utf8_binary_lcase));
   insert into r(a) values(array('A' collate utf8_binary_lcase));
   
   select * from l join r where l.a = r.a;
   ```
   Both runs should yield one row since the arrays are considered equal.
   
   Problem is in `isBinaryStable` function which should return false if **any** of its subtypes is non-binary collated string.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   To support aggregates and joins in arrays of collated strings properly.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   Yes, it fixes the described scenarios.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   Added new checks to collation suite and updated golden file.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1531862474


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -639,4 +639,169 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "expressionStr" -> "SUBSTRING(struct1.a, 0, 1)",
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
+
+  test("Aggregation of arrays built on collated strings") {
+    val tableName = "test_agg_arr_collated"
+    val simple = Seq(
+      // binary
+      ("utf8_binary", Seq("array('aaa')", "array('AAA')"),
+        Seq((Seq("aaa"), 1), (Seq("AAA"), 1))),
+      ("utf8_binary", Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"),
+        Seq((Seq("aaa", "bbb"), 1), (Seq("AAA", "BBB"), 1))),
+      ("utf8_binary", Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"),
+        Seq((Seq("aaa"), 1), (Seq("bbb"), 1), (Seq("AAA"), 1), (Seq("BBB"), 1))),
+      // non-binary
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa"), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase, 'bbb' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase, 'BBB' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa", "bbb"), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase)",
+          "array('bbb' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase)",
+          "array('BBB' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa"), 2), (Seq("bbb"), 2)))
+    )
+    val nested = Seq(
+      // binary
+      ("utf8_binary", Seq("array(array('aaa'))", "array(array('AAA'))"),
+        Seq((Seq(Seq("aaa")), 1), (Seq(Seq("AAA")), 1))),
+      ("utf8_binary", Seq("array(array('aaa'), array('bbb'))", "array(array('AAA'), array('bbb'))"),
+        Seq((Seq(Seq("aaa"), Seq("bbb")), 1), (Seq(Seq("AAA"), Seq("bbb")), 1))),
+      // non-binary
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase))"
+        ),
+        Seq((Seq(Seq("aaa")), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase), array('bbb' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase), array('bbb' collate utf8_binary_lcase))"
+        ),
+        Seq((Seq(Seq("aaa"), Seq("bbb")), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase, 'AAA' collate utf8_binary_lcase)," +
+            "array('bbb' collate utf8_binary_lcase, 'ccc' collate utf8_binary_lcase))",
+          "array(array('aaa' collate utf8_binary_lcase, 'aaa' collate utf8_binary_lcase)," +
+            "array('bbb' collate utf8_binary_lcase, 'ccc' collate utf8_binary_lcase)," +
+            "array('ddd' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase, 'aaa' collate utf8_binary_lcase)," +
+            "array('BBB' collate utf8_binary_lcase, 'CCC' collate utf8_binary_lcase))"
+        ),
+        Seq(
+          (Seq(Seq("aaa", "AAA"), Seq("bbb", "ccc")), 2),
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc"), Seq("ddd")), 1)
+        )
+      )
+    )
+
+    val all = simple.map {
+      case (collName, data, res) => (s"array<string collate $collName>", data, res)
+    } ++ nested.map {
+      case (collName, data, res) => (s"array<array<string collate $collName>>", data, res)
+    }
+
+    all.foreach {
+      case (dt, rows, count) =>
+        withTable(tableName) {
+          sql(s"create table $tableName(a $dt) using parquet")
+          rows.map(row => sql(s"insert into $tableName(a) values($row)"))
+          checkAnswer(sql(s"select a, count(*) from $tableName group by a"),
+            count.map { case (aggStr, cnt) => Row(aggStr, cnt) })
+          checkAnswer(sql(s"select distinct a from $tableName"),
+            count.map{ case (aggStr, _) => Row(aggStr)})
+        }
+    }
+  }
+
+  test("Join on arrays of collated strings") {
+    val tablePrefix = "test_join_arr_collated"
+    val tableLeft = s"${tablePrefix}_left"
+    val tableRight = s"${tablePrefix}_right"
+    val simple = Seq(
+      // binary
+      ("utf8_binary", Seq("array('aaa')"), Seq("array('AAA')"), Seq()),
+      // non-binary
+      ("utf8_binary_lcase",
+        Seq("array('aaa' collate utf8_binary_lcase)"),
+        Seq("array('AAA' collate utf8_binary_lcase)"),
+        Seq(Seq("aaa"))),
+      ("utf8_binary_lcase",
+        Seq("array('aaa' collate utf8_binary_lcase, 'bbb' collate utf8_binary_lcase)"),
+        Seq("array('AAA' collate utf8_binary_lcase, 'BBB' collate utf8_binary_lcase)"),
+        Seq(Seq("aaa", "bbb"))),
+      ("utf8_binary_lcase",
+        Seq("array('aaa' collate utf8_binary_lcase)", "array('bbb' collate utf8_binary_lcase)"),
+        Seq("array('AAA' collate utf8_binary_lcase)", "array('BBB' collate utf8_binary_lcase)"),
+        Seq(Seq("aaa"), Seq("bbb"))),
+      ("utf8_binary_lcase",
+        Seq("array('aaa' collate utf8_binary_lcase)", "array('bbb' collate utf8_binary_lcase)"),
+        Seq("array('AAAA' collate utf8_binary_lcase)", "array('BBBB' collate utf8_binary_lcase)"),
+        Seq())
+    )
+    val nested = Seq(
+      // binary
+      ("utf8_binary", Seq("array(array('aaa'))"), Seq("array(array('AAA'))"), Seq()),
+      // non-binary
+      ("utf8_binary_lcase",
+        Seq("array(array('aaa' collate utf8_binary_lcase))"),
+        Seq("array(array('AAA' collate utf8_binary_lcase))"),
+        Seq(Seq(Seq("aaa")))),
+      ("utf8_binary_lcase",
+        Seq("array(array('aaa' collate utf8_binary_lcase, 'bbb' collate utf8_binary_lcase))"),
+        Seq("array(array('AAA' collate utf8_binary_lcase, 'BBB' collate utf8_binary_lcase))"),
+        Seq(Seq(Seq("aaa", "bbb")))),
+      ("utf8_binary_lcase",
+        Seq("array(array('aaa' collate utf8_binary_lcase)," +
+          "array('bbb' collate utf8_binary_lcase))"),
+        Seq("array(array('AAA' collate utf8_binary_lcase)," +
+          "array('BBB' collate utf8_binary_lcase))"),
+        Seq(Seq(Seq("aaa"), Seq("bbb")))),
+      ("utf8_binary_lcase",
+        Seq("array(array('aaa' collate utf8_binary_lcase)," +
+          "array('bbb' collate utf8_binary_lcase))"),
+        Seq("array(array('AAA' collate utf8_binary_lcase)," +
+          "array('CCC' collate utf8_binary_lcase))"),
+        Seq())
+    )
+
+    val all = simple.map {
+      case (collName, l, r, res) => (s"array<string collate $collName>", l, r, res)
+    } ++ nested.map {
+      case (collName, l, r, res) => (s"array<array<string collate $collName>>", l, r, res)
+    }
+
+    all.foreach {
+      case (dt, dataLeft, dataRight, res) =>
+        withTable(tableLeft) {
+          withTable(tableRight) {
+            Seq(tableLeft, tableRight).map(tab => sql(s"create table $tab(a $dt) using parquet"))
+            Seq((tableLeft, dataLeft), (tableRight, dataRight)).foreach {
+              case (tab, data) => data.map(row => sql(s"insert into $tab(a) values($row)"))
+            }
+            checkAnswer(
+              sql(
+                s"""
+                   |select $tableLeft.a from $tableLeft join $tableRight
+                   |where $tableLeft.a = $tableRight.a
+                   |""".stripMargin),
+              res.map(Row(_))
+            )
+          }
+        }
+    }
+  }

Review Comment:
   LGTM, but I would prefer if you could simplify the test cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1532194050


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -639,4 +641,184 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "expressionStr" -> "SUBSTRING(struct1.a, 0, 1)",
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
+
+  trait ArrayCheck {
+    def dataType: String
+    def dataTypeCollated: String
+  }
+
+  trait ArrayCheckSimple extends ArrayCheck {
+    override def dataType: String = s"array<string>"
+    override def dataTypeCollated: String = s"array<string collate utf8_binary_lcase>"
+  }
+
+  trait ArrayCheckNested extends ArrayCheck {
+    override def dataType: String = s"array<array<string>>"
+    override def dataTypeCollated: String = s"array<array<string collate utf8_binary_lcase>>"
+  }
+
+  test("Aggregation of arrays built on collated strings") {
+    abstract class AggCheck(val rows: Seq[String], val result: Seq[(Any, Int)]) extends ArrayCheck
+
+    case class AggCheckSimple(override val rows: Seq[String],
+                              override val result: Seq[(Seq[String], Int)])
+      extends AggCheck(rows, result) with ArrayCheckSimple
+
+    case class AggCheckNested(override val rows: Seq[String],
+                              override val result: Seq[(Seq[Seq[String]], Int)])
+      extends AggCheck(rows, result) with ArrayCheckNested
+
+    val tableName = "test_agg_arr_collated"
+    val tableNameLowercase = "test_agg_arr_collated_lowercase"
+
+    Seq(
+      // simple
+      AggCheckSimple(
+        rows = Seq("array('aaa')", "array('AAA')"),
+        result = Seq((Seq("aaa"), 2))
+      ),
+      AggCheckSimple(
+        rows = Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"),
+        result = Seq((Seq("aaa", "bbb"), 2))
+      ),
+      AggCheckSimple(
+        rows = Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"),
+        result = Seq((Seq("aaa"), 2), (Seq("bbb"), 2))
+      ),
+      // nested
+      AggCheckNested(
+        rows = Seq("array(array('aaa'))", "array(array('AAA'))"),
+        result = Seq((Seq(Seq("aaa")), 2))
+      ),
+      AggCheckNested(
+        rows = Seq("array(array('aaa'), array('bbb'))", "array(array('AAA'), array('bbb'))"),
+        result = Seq((Seq(Seq("aaa"), Seq("bbb")), 2))
+      ),
+      AggCheckNested(
+        rows = Seq(
+          "array(array('aaa', 'aaa'), array('bbb', 'ccc'))",
+          "array(array('aaa', 'aaa'), array('bbb', 'ccc'), array('ddd'))",
+          "array(array('AAA', 'AAA'), array('BBB', 'CCC'))"
+        ),
+        result = Seq(
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc")), 2),
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc"), Seq("ddd")), 1)
+        )
+      )
+    ).map((check: AggCheck) =>
+        withTable(tableName, tableNameLowercase) {
+          def checkResults(table: String): Unit = {
+            checkAnswer(sql(s"select a, count(*) from $table group by a"),
+              check.result.map{ case (agg, cnt) => Row(agg, cnt) })
+            checkAnswer(sql(s"select distinct a from $table"),
+              check.result.map{ case (agg, _) => Row(agg) })
+          }
+
+          // check against non-binary collation
+          sql(s"create table $tableName(a ${check.dataTypeCollated}) using parquet")
+          check.rows.map(row => sql(s"insert into $tableName(a) values($row)"))
+          checkResults(tableName)
+
+          // binary collation with values converted to lowercase should match the results as well
+          sql(s"create table $tableNameLowercase(a ${check.dataType}) using parquet")
+          check.rows.map(row =>
+            sql(s"insert into $tableNameLowercase(a) values(${row.toLowerCase(Locale.ROOT)})"))

Review Comment:
   Can you use `UTF8String` `toLowerCase` function instead of `java.util.locale`? For this case it is ok, but we shouldn't start mixing `ICU` collator with Java's default one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1531812913


##########
sql/core/src/test/resources/sql-tests/inputs/collations.sql:
##########
@@ -77,3 +77,18 @@ select array_distinct(array('aaa' collate utf8_binary_lcase, 'AAA' collate utf8_
 select array_union(array('aaa' collate utf8_binary_lcase), array('AAA' collate utf8_binary_lcase));
 select array_intersect(array('aaa' collate utf8_binary_lcase), array('AAA' collate utf8_binary_lcase));
 select array_except(array('aaa' collate utf8_binary_lcase), array('AAA' collate utf8_binary_lcase));
+
+-- array joins & aggregates
+create table l(a array<string collate utf8_binary_lcase>) using parquet;

Review Comment:
   Seems like golden file examples are already covered by `CollationSuite`? We don't need both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on PR #45611:
URL: https://github.com/apache/spark/pull/45611#issuecomment-2014487381

   +1, LGTM. Merging to master.
   Thank you, @nikolamand-db and @dbatomic @cloud-fan @HyukjinKwon for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1534270999


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -640,6 +641,201 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
 
+  trait ArrayCheck {

Review Comment:
   Why don't you put the new code to the end of the test suite?



##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -640,6 +641,201 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
 
+  trait ArrayCheck {
+    def dataType: String
+    def dataTypeCollated: String
+  }
+
+  trait ArrayCheckSimple extends ArrayCheck {
+    override def dataType: String = "array<string>"
+    override def dataTypeCollated: String = "array<string collate utf8_binary_lcase>"
+  }
+
+  trait ArrayCheckNested extends ArrayCheck {
+    override def dataType: String = "array<array<string>>"
+    override def dataTypeCollated: String = "array<array<string collate utf8_binary_lcase>>"
+  }
+
+  test("Aggregation of arrays built on collated strings") {
+    abstract class AggCheck(val rows: Seq[String], val result: Seq[(Any, Int)]) extends ArrayCheck
+
+    case class AggCheckSimple(
+        override val rows: Seq[String],
+        override val result: Seq[(Seq[String], Int)])
+      extends AggCheck(rows, result) with ArrayCheckSimple
+
+    case class AggCheckNested(
+        override val rows: Seq[String],
+        override val result: Seq[(Seq[Seq[String]], Int)])
+      extends AggCheck(rows, result) with ArrayCheckNested
+
+    val tableName = "test_agg_arr_collated"
+    val tableNameLowercase = "test_agg_arr_collated_lowercase"
+
+    Seq(
+      // simple
+      AggCheckSimple(
+        rows = Seq("array('aaa')", "array('AAA')"),
+        result = Seq((Seq("aaa"), 2))
+      ),
+      AggCheckSimple(
+        rows = Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"),
+        result = Seq((Seq("aaa", "bbb"), 2))
+      ),
+      AggCheckSimple(
+        rows = Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"),
+        result = Seq((Seq("aaa"), 2), (Seq("bbb"), 2))
+      ),
+      // nested
+      AggCheckNested(
+        rows = Seq("array(array('aaa'))", "array(array('AAA'))"),
+        result = Seq((Seq(Seq("aaa")), 2))
+      ),
+      AggCheckNested(
+        rows = Seq("array(array('aaa'), array('bbb'))", "array(array('AAA'), array('bbb'))"),
+        result = Seq((Seq(Seq("aaa"), Seq("bbb")), 2))
+      ),
+      AggCheckNested(
+        rows = Seq(
+          "array(array('aaa', 'aaa'), array('bbb', 'ccc'))",
+          "array(array('aaa', 'aaa'), array('bbb', 'ccc'), array('ddd'))",
+          "array(array('AAA', 'AAA'), array('BBB', 'CCC'))"
+        ),
+        result = Seq(
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc")), 2),
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc"), Seq("ddd")), 1)
+        )
+      )
+    ).map((check: AggCheck) =>
+        withTable(tableName, tableNameLowercase) {
+          def checkResults(table: String): Unit = {
+            checkAnswer(sql(s"select a, count(*) from $table group by a"),
+              check.result.map{ case (agg, cnt) => Row(agg, cnt) })
+            checkAnswer(sql(s"select distinct a from $table"),
+              check.result.map{ case (agg, _) => Row(agg) })
+          }
+
+          // check against non-binary collation
+          sql(s"create table $tableName(a ${check.dataTypeCollated}) using parquet")
+          check.rows.map(row => sql(s"insert into $tableName(a) values($row)"))
+          checkResults(tableName)
+
+          // binary collation with values converted to lowercase should match the results as well
+          sql(s"create table $tableNameLowercase(a ${check.dataType}) using parquet")
+          check.rows.map(row =>
+            // scalastyle:off caselocale
+            sql(
+              s"""
+                 |insert into $tableNameLowercase(a)
+                 |values(${UTF8String.fromString(row).toLowerCase})
+                 |""".stripMargin)
+            // scalastyle:on caselocale
+          )
+          checkResults(tableNameLowercase)
+        }
+    )
+  }
+
+  test("Join on arrays of collated strings") {
+    abstract class JoinCheck(
+        val leftRows: Seq[String],
+        val rightRows: Seq[String],
+        val resultRows: Seq[Any])
+      extends ArrayCheck
+
+    case class JoinSimpleCheck(
+        override val leftRows: Seq[String],
+        override val rightRows: Seq[String],
+        override val resultRows: Seq[Seq[String]])
+      extends JoinCheck(leftRows, rightRows, resultRows) with ArrayCheckSimple
+
+    case class JoinNestedCheck(
+        override val leftRows: Seq[String],
+        override val rightRows: Seq[String],
+        override val resultRows: Seq[Seq[Seq[String]]])
+      extends JoinCheck(leftRows, rightRows, resultRows) with ArrayCheckNested
+
+    val tablePrefix = "test_join_arr_collated"
+    val tableLeft = s"${tablePrefix}_left"
+    val tableLeftLowercase = s"${tableLeft}_lowercase"
+    val tableRight = s"${tablePrefix}_right"
+    val tableRightLowercase = s"${tableRight}_lowercase"
+
+    Seq(
+      // simple
+      JoinSimpleCheck(
+        leftRows = Seq("array('aaa')"),
+        rightRows = Seq("array('AAA')"),
+        resultRows = Seq(Seq("aaa"))
+      ),
+      JoinSimpleCheck(
+        leftRows = Seq("array('aaa', 'bbb')"),
+        rightRows = Seq("array('AAA', 'BBB')"),
+        resultRows = Seq(Seq("aaa", "bbb"))
+      ),
+      JoinSimpleCheck(
+        leftRows = Seq("array('aaa')", "array('bbb')"),
+        rightRows = Seq("array('AAA')", "array('BBB')"),
+        resultRows = Seq(Seq("aaa"), Seq("bbb"))
+      ),
+      JoinSimpleCheck(
+        leftRows = Seq("array('aaa')", "array('bbb')"),
+        rightRows = Seq("array('AAAA')", "array('BBBB')"),
+        resultRows = Seq()
+      ),
+      // nested
+      JoinNestedCheck(
+        leftRows = Seq("array(array('aaa'))"),
+        rightRows = Seq("array(array('AAA'))"),
+        resultRows = Seq(Seq(Seq("aaa")))
+      ),
+      JoinNestedCheck(
+        leftRows = Seq("array(array('aaa', 'bbb'))"),
+        rightRows = Seq("array(array('AAA', 'BBB'))"),
+        resultRows = Seq(Seq(Seq("aaa", "bbb")))
+      ),
+      JoinNestedCheck(
+        Seq("array(array('aaa'), array('bbb'))"),
+        Seq("array(array('AAA'), array('BBB'))"),
+        Seq(Seq(Seq("aaa"), Seq("bbb")))
+      ),
+      JoinNestedCheck(
+        Seq("array(array('aaa'), array('bbb'))"),
+        Seq("array(array('AAA'), array('CCC'))"),
+        Seq()
+      )
+    ).map((check: JoinCheck) =>

Review Comment:
   Better to use `.foreach` here. `map` is usually used to build new collection which is not need in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1531856491


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -639,4 +639,169 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "expressionStr" -> "SUBSTRING(struct1.a, 0, 1)",
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
+
+  test("Aggregation of arrays built on collated strings") {
+    val tableName = "test_agg_arr_collated"
+    val simple = Seq(
+      // binary
+      ("utf8_binary", Seq("array('aaa')", "array('AAA')"),

Review Comment:
   Can you use named case objects instead of tuples? Not sure about the others, but I can't read this :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1533994036


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -640,6 +641,201 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
 
+  trait ArrayCheck {
+    def dataType: String
+    def dataTypeCollated: String
+  }
+
+  trait ArrayCheckSimple extends ArrayCheck {
+    override def dataType: String = "array<string>"
+    override def dataTypeCollated: String = "array<string collate utf8_binary_lcase>"
+  }
+
+  trait ArrayCheckNested extends ArrayCheck {
+    override def dataType: String = "array<array<string>>"
+    override def dataTypeCollated: String = "array<array<string collate utf8_binary_lcase>>"
+  }
+
+  test("Aggregation of arrays built on collated strings") {
+    abstract class AggCheck(val rows: Seq[String], val result: Seq[(Any, Int)]) extends ArrayCheck
+
+    case class AggCheckSimple(
+        override val rows: Seq[String],
+        override val result: Seq[(Seq[String], Int)])
+      extends AggCheck(rows, result) with ArrayCheckSimple
+
+    case class AggCheckNested(
+        override val rows: Seq[String],
+        override val result: Seq[(Seq[Seq[String]], Int)])
+      extends AggCheck(rows, result) with ArrayCheckNested
+
+    val tableName = "test_agg_arr_collated"
+    val tableNameLowercase = "test_agg_arr_collated_lowercase"
+
+    Seq(
+      // simple
+      AggCheckSimple(
+        rows = Seq("array('aaa')", "array('AAA')"),
+        result = Seq((Seq("aaa"), 2))
+      ),
+      AggCheckSimple(
+        rows = Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"),
+        result = Seq((Seq("aaa", "bbb"), 2))
+      ),
+      AggCheckSimple(
+        rows = Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"),
+        result = Seq((Seq("aaa"), 2), (Seq("bbb"), 2))
+      ),
+      // nested
+      AggCheckNested(
+        rows = Seq("array(array('aaa'))", "array(array('AAA'))"),
+        result = Seq((Seq(Seq("aaa")), 2))
+      ),
+      AggCheckNested(
+        rows = Seq("array(array('aaa'), array('bbb'))", "array(array('AAA'), array('bbb'))"),
+        result = Seq((Seq(Seq("aaa"), Seq("bbb")), 2))
+      ),
+      AggCheckNested(
+        rows = Seq(
+          "array(array('aaa', 'aaa'), array('bbb', 'ccc'))",
+          "array(array('aaa', 'aaa'), array('bbb', 'ccc'), array('ddd'))",
+          "array(array('AAA', 'AAA'), array('BBB', 'CCC'))"
+        ),
+        result = Seq(
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc")), 2),
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc"), Seq("ddd")), 1)
+        )
+      )
+    ).map((check: AggCheck) =>
+        withTable(tableName, tableNameLowercase) {
+          def checkResults(table: String): Unit = {
+            checkAnswer(sql(s"select a, count(*) from $table group by a"),

Review Comment:
   It's a bad practice that we try to test all cases in **end-to-end** tests. We should improve the test coverage in the unit test.
   
   I suggest we add tests in `UnsafeRowUtilsSuite` to test the `isBinaryStable` function with different cases: array of string, array of array of string, struct of array of string, etc. The end-to-end test here should just run a few queries to show it works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1534247221


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala:
##########
@@ -204,8 +204,8 @@ object UnsafeRowUtils {
    * e.g. this is not true for non-binary collations (any case/accent insensitive collation
    * can lead to rows being semantically equal even though their binary representations differ).
    */
-  def isBinaryStable(dataType: DataType): Boolean = dataType.existsRecursively {
-    case st: StringType => CollationFactory.fetchCollation(st.collationId).isBinaryCollation
-    case _ => true
+  def isBinaryStable(dataType: DataType): Boolean = !dataType.existsRecursively {

Review Comment:
   why `isBinaryStable` is in `UnsafeRowUtils`. Is the implementation bound somehow to unsafe row?
   
   Why it is not in `DataTypeUtils`, for example?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "nikolamand-db (via GitHub)" <gi...@apache.org>.
nikolamand-db commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1532172271


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -639,4 +639,169 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "expressionStr" -> "SUBSTRING(struct1.a, 0, 1)",
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
+
+  test("Aggregation of arrays built on collated strings") {
+    val tableName = "test_agg_arr_collated"
+    val simple = Seq(
+      // binary
+      ("utf8_binary", Seq("array('aaa')", "array('AAA')"),
+        Seq((Seq("aaa"), 1), (Seq("AAA"), 1))),
+      ("utf8_binary", Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"),
+        Seq((Seq("aaa", "bbb"), 1), (Seq("AAA", "BBB"), 1))),
+      ("utf8_binary", Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"),
+        Seq((Seq("aaa"), 1), (Seq("bbb"), 1), (Seq("AAA"), 1), (Seq("BBB"), 1))),
+      // non-binary
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa"), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase, 'bbb' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase, 'BBB' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa", "bbb"), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase)",
+          "array('bbb' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase)",
+          "array('BBB' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa"), 2), (Seq("bbb"), 2)))
+    )
+    val nested = Seq(
+      // binary
+      ("utf8_binary", Seq("array(array('aaa'))", "array(array('AAA'))"),
+        Seq((Seq(Seq("aaa")), 1), (Seq(Seq("AAA")), 1))),
+      ("utf8_binary", Seq("array(array('aaa'), array('bbb'))", "array(array('AAA'), array('bbb'))"),
+        Seq((Seq(Seq("aaa"), Seq("bbb")), 1), (Seq(Seq("AAA"), Seq("bbb")), 1))),
+      // non-binary
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase))"
+        ),
+        Seq((Seq(Seq("aaa")), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase), array('bbb' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase), array('bbb' collate utf8_binary_lcase))"
+        ),
+        Seq((Seq(Seq("aaa"), Seq("bbb")), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase, 'AAA' collate utf8_binary_lcase)," +
+            "array('bbb' collate utf8_binary_lcase, 'ccc' collate utf8_binary_lcase))",
+          "array(array('aaa' collate utf8_binary_lcase, 'aaa' collate utf8_binary_lcase)," +
+            "array('bbb' collate utf8_binary_lcase, 'ccc' collate utf8_binary_lcase)," +
+            "array('ddd' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase, 'aaa' collate utf8_binary_lcase)," +
+            "array('BBB' collate utf8_binary_lcase, 'CCC' collate utf8_binary_lcase))"
+        ),
+        Seq(
+          (Seq(Seq("aaa", "AAA"), Seq("bbb", "ccc")), 2),
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc"), Seq("ddd")), 1)
+        )
+      )
+    )
+
+    val all = simple.map {
+      case (collName, data, res) => (s"array<string collate $collName>", data, res)
+    } ++ nested.map {
+      case (collName, data, res) => (s"array<array<string collate $collName>>", data, res)
+    }
+
+    all.foreach {
+      case (dt, rows, count) =>
+        withTable(tableName) {
+          sql(s"create table $tableName(a $dt) using parquet")
+          rows.map(row => sql(s"insert into $tableName(a) values($row)"))
+          checkAnswer(sql(s"select a, count(*) from $tableName group by a"),
+            count.map { case (aggStr, cnt) => Row(aggStr, cnt) })
+          checkAnswer(sql(s"select distinct a from $tableName"),
+            count.map{ case (aggStr, _) => Row(aggStr)})
+        }
+    }
+  }

Review Comment:
   Updated tests to support this and added case classes for better readability. Please check again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1533375207


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -640,6 +641,196 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
 
+  trait ArrayCheck {
+    def dataType: String
+    def dataTypeCollated: String
+  }
+
+  trait ArrayCheckSimple extends ArrayCheck {
+    override def dataType: String = s"array<string>"
+    override def dataTypeCollated: String = s"array<string collate utf8_binary_lcase>"
+  }
+
+  trait ArrayCheckNested extends ArrayCheck {
+    override def dataType: String = s"array<array<string>>"
+    override def dataTypeCollated: String = s"array<array<string collate utf8_binary_lcase>>"
+  }
+
+  test("Aggregation of arrays built on collated strings") {
+    abstract class AggCheck(val rows: Seq[String], val result: Seq[(Any, Int)]) extends ArrayCheck
+
+    case class AggCheckSimple(override val rows: Seq[String],
+                              override val result: Seq[(Seq[String], Int)])
+      extends AggCheck(rows, result) with ArrayCheckSimple
+
+    case class AggCheckNested(override val rows: Seq[String],
+                              override val result: Seq[(Seq[Seq[String]], Int)])
+      extends AggCheck(rows, result) with ArrayCheckNested
+
+    val tableName = "test_agg_arr_collated"
+    val tableNameLowercase = "test_agg_arr_collated_lowercase"
+
+    Seq(
+      // simple
+      AggCheckSimple(
+        rows = Seq("array('aaa')", "array('AAA')"),
+        result = Seq((Seq("aaa"), 2))
+      ),
+      AggCheckSimple(
+        rows = Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"),
+        result = Seq((Seq("aaa", "bbb"), 2))
+      ),
+      AggCheckSimple(
+        rows = Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"),
+        result = Seq((Seq("aaa"), 2), (Seq("bbb"), 2))
+      ),
+      // nested
+      AggCheckNested(
+        rows = Seq("array(array('aaa'))", "array(array('AAA'))"),
+        result = Seq((Seq(Seq("aaa")), 2))
+      ),
+      AggCheckNested(
+        rows = Seq("array(array('aaa'), array('bbb'))", "array(array('AAA'), array('bbb'))"),
+        result = Seq((Seq(Seq("aaa"), Seq("bbb")), 2))
+      ),
+      AggCheckNested(
+        rows = Seq(
+          "array(array('aaa', 'aaa'), array('bbb', 'ccc'))",
+          "array(array('aaa', 'aaa'), array('bbb', 'ccc'), array('ddd'))",
+          "array(array('AAA', 'AAA'), array('BBB', 'CCC'))"
+        ),
+        result = Seq(
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc")), 2),
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc"), Seq("ddd")), 1)
+        )
+      )
+    ).map((check: AggCheck) =>
+        withTable(tableName, tableNameLowercase) {
+          def checkResults(table: String): Unit = {
+            checkAnswer(sql(s"select a, count(*) from $table group by a"),
+              check.result.map{ case (agg, cnt) => Row(agg, cnt) })
+            checkAnswer(sql(s"select distinct a from $table"),
+              check.result.map{ case (agg, _) => Row(agg) })
+          }
+
+          // check against non-binary collation
+          sql(s"create table $tableName(a ${check.dataTypeCollated}) using parquet")
+          check.rows.map(row => sql(s"insert into $tableName(a) values($row)"))
+          checkResults(tableName)
+
+          // binary collation with values converted to lowercase should match the results as well
+          sql(s"create table $tableNameLowercase(a ${check.dataType}) using parquet")
+          check.rows.map(row =>
+            // scalastyle:off caselocale
+            sql(
+              s"""
+                 |insert into $tableNameLowercase(a)
+                 |values(${UTF8String.fromString(row).toLowerCase})
+                 |""".stripMargin)
+            // scalastyle:on caselocale
+          )
+          checkResults(tableNameLowercase)
+        }
+    )
+  }
+
+  test("Join on arrays of collated strings") {
+    abstract class JoinCheck(val leftRows: Seq[String],
+                             val rightRows: Seq[String],
+                             val resultRows: Seq[Any])
+      extends ArrayCheck

Review Comment:
   ```suggestion
       abstract class JoinCheck(
           val leftRows: Seq[String],
           val rightRows: Seq[String],
           val resultRows: Seq[Any])
         extends ArrayCheck
   ```
   per https://github.com/databricks/scala-style-guide?tab=readme-ov-file#spacing-and-indentation and below too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "nikolamand-db (via GitHub)" <gi...@apache.org>.
nikolamand-db commented on PR #45611:
URL: https://github.com/apache/spark/pull/45611#issuecomment-2013417373

   @MaxGekk @cloud-fan please check the updated tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk closed pull request #45611: [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings
URL: https://github.com/apache/spark/pull/45611


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1531872172


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -639,4 +639,169 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "expressionStr" -> "SUBSTRING(struct1.a, 0, 1)",
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
+
+  test("Aggregation of arrays built on collated strings") {
+    val tableName = "test_agg_arr_collated"
+    val simple = Seq(
+      // binary
+      ("utf8_binary", Seq("array('aaa')", "array('AAA')"),
+        Seq((Seq("aaa"), 1), (Seq("AAA"), 1))),
+      ("utf8_binary", Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"),
+        Seq((Seq("aaa", "bbb"), 1), (Seq("AAA", "BBB"), 1))),
+      ("utf8_binary", Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"),
+        Seq((Seq("aaa"), 1), (Seq("bbb"), 1), (Seq("AAA"), 1), (Seq("BBB"), 1))),
+      // non-binary
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa"), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase, 'bbb' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase, 'BBB' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa", "bbb"), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array('aaa' collate utf8_binary_lcase)",
+          "array('bbb' collate utf8_binary_lcase)",
+          "array('AAA' collate utf8_binary_lcase)",
+          "array('BBB' collate utf8_binary_lcase)"
+        ),
+        Seq((Seq("aaa"), 2), (Seq("bbb"), 2)))
+    )
+    val nested = Seq(
+      // binary
+      ("utf8_binary", Seq("array(array('aaa'))", "array(array('AAA'))"),
+        Seq((Seq(Seq("aaa")), 1), (Seq(Seq("AAA")), 1))),
+      ("utf8_binary", Seq("array(array('aaa'), array('bbb'))", "array(array('AAA'), array('bbb'))"),
+        Seq((Seq(Seq("aaa"), Seq("bbb")), 1), (Seq(Seq("AAA"), Seq("bbb")), 1))),
+      // non-binary
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase))"
+        ),
+        Seq((Seq(Seq("aaa")), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase), array('bbb' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase), array('bbb' collate utf8_binary_lcase))"
+        ),
+        Seq((Seq(Seq("aaa"), Seq("bbb")), 2))),
+      ("utf8_binary_lcase",
+        Seq(
+          "array(array('aaa' collate utf8_binary_lcase, 'AAA' collate utf8_binary_lcase)," +
+            "array('bbb' collate utf8_binary_lcase, 'ccc' collate utf8_binary_lcase))",
+          "array(array('aaa' collate utf8_binary_lcase, 'aaa' collate utf8_binary_lcase)," +
+            "array('bbb' collate utf8_binary_lcase, 'ccc' collate utf8_binary_lcase)," +
+            "array('ddd' collate utf8_binary_lcase))",
+          "array(array('AAA' collate utf8_binary_lcase, 'aaa' collate utf8_binary_lcase)," +
+            "array('BBB' collate utf8_binary_lcase, 'CCC' collate utf8_binary_lcase))"
+        ),
+        Seq(
+          (Seq(Seq("aaa", "AAA"), Seq("bbb", "ccc")), 2),
+          (Seq(Seq("aaa", "aaa"), Seq("bbb", "ccc"), Seq("ddd")), 1)
+        )
+      )
+    )
+
+    val all = simple.map {
+      case (collName, data, res) => (s"array<string collate $collName>", data, res)
+    } ++ nested.map {
+      case (collName, data, res) => (s"array<array<string collate $collName>>", data, res)
+    }
+
+    all.foreach {
+      case (dt, rows, count) =>
+        withTable(tableName) {
+          sql(s"create table $tableName(a $dt) using parquet")
+          rows.map(row => sql(s"insert into $tableName(a) values($row)"))
+          checkAnswer(sql(s"select a, count(*) from $tableName group by a"),
+            count.map { case (aggStr, cnt) => Row(aggStr, cnt) })
+          checkAnswer(sql(s"select distinct a from $tableName"),
+            count.map{ case (aggStr, _) => Row(aggStr)})
+        }
+    }
+  }

Review Comment:
   The test pattern that can be useful here is what we did for Window Aggs.
   In short, just create a query that targets mixed-case data with LCASE collation (e.g. "aA", "aa", "AA") and query that targets normalized data with UTF8_BINARY ("aa", "aa", "aa"). Aggs and Joins should return the same result.
   
   You can find test example here:
   https://github.com/apache/spark/pull/45568



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1531861181


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -639,4 +639,169 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "expressionStr" -> "SUBSTRING(struct1.a, 0, 1)",
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
+
+  test("Aggregation of arrays built on collated strings") {
+    val tableName = "test_agg_arr_collated"
+    val simple = Seq(
+      // binary
+      ("utf8_binary", Seq("array('aaa')", "array('AAA')"),
+        Seq((Seq("aaa"), 1), (Seq("AAA"), 1))),
+      ("utf8_binary", Seq("array('aaa', 'bbb')", "array('AAA', 'BBB')"),
+        Seq((Seq("aaa", "bbb"), 1), (Seq("AAA", "BBB"), 1))),
+      ("utf8_binary", Seq("array('aaa')", "array('bbb')", "array('AAA')", "array('BBB')"),
+        Seq((Seq("aaa"), 1), (Seq("bbb"), 1), (Seq("AAA"), 1), (Seq("BBB"), 1))),

Review Comment:
   Do we want to cover UTF_BINARY test case here at all? In previous examples we mainly used UTF8_BINARY as validation but the real thing we want to test here are collations other than UTF8_BINARY.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1533372861


##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##########
@@ -640,6 +641,196 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
         "reason" -> "generation expression cannot contain non-default collated string type"))
   }
 
+  trait ArrayCheck {
+    def dataType: String
+    def dataTypeCollated: String
+  }
+
+  trait ArrayCheckSimple extends ArrayCheck {
+    override def dataType: String = s"array<string>"

Review Comment:
   nit: let's remove those.
   
   ```suggestion
       override def dataType: String = "array<string>"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1534247221


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala:
##########
@@ -204,8 +204,8 @@ object UnsafeRowUtils {
    * e.g. this is not true for non-binary collations (any case/accent insensitive collation
    * can lead to rows being semantically equal even though their binary representations differ).
    */
-  def isBinaryStable(dataType: DataType): Boolean = dataType.existsRecursively {
-    case st: StringType => CollationFactory.fetchCollation(st.collationId).isBinaryCollation
-    case _ => true
+  def isBinaryStable(dataType: DataType): Boolean = !dataType.existsRecursively {

Review Comment:
   why `isBinaryStable` is in `UnsafeRowUtils`. Is the implementation bound somehow to unsafe row?
   
   Why it is not in `DataTypeUtils` or `Collation...`, for example?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47483][SQL] Add support for aggregation and join operations on arrays of collated strings [spark]

Posted by "nikolamand-db (via GitHub)" <gi...@apache.org>.
nikolamand-db commented on code in PR #45611:
URL: https://github.com/apache/spark/pull/45611#discussion_r1534525696


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala:
##########
@@ -204,8 +204,8 @@ object UnsafeRowUtils {
    * e.g. this is not true for non-binary collations (any case/accent insensitive collation
    * can lead to rows being semantically equal even though their binary representations differ).
    */
-  def isBinaryStable(dataType: DataType): Boolean = dataType.existsRecursively {
-    case st: StringType => CollationFactory.fetchCollation(st.collationId).isBinaryCollation
-    case _ => true
+  def isBinaryStable(dataType: DataType): Boolean = !dataType.existsRecursively {

Review Comment:
   Not sure what are the consequences of moving this function, do you know if we can do that @dbatomic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org