You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2024/02/05 16:22:24 UTC

(spark) branch master updated: [SPARK-46833][SQL] Collations - Introducing CollationFactory which provides comparison and hashing rules for supported collations

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ff5f396632d [SPARK-46833][SQL] Collations - Introducing CollationFactory which provides comparison and hashing rules for supported collations
6ff5f396632d is described below

commit 6ff5f396632d5b715df083ade81349206373c78c
Author: Aleksandar Tomic <al...@databricks.com>
AuthorDate: Tue Feb 6 00:22:08 2024 +0800

    [SPARK-46833][SQL] Collations - Introducing CollationFactory which provides comparison and hashing rules for supported collations
    
    ### What changes were proposed in this pull request?
    
    This PR introduces CollationFactory singleton class which provides all collation aware methods, for given collation, that can be invoked against UTF8Strings.
    
    For higher level overview of Collation track please take a look at the umbrella [JIRA](https://issues.apache.org/jira/browse/SPARK-46830).
    
    At this point, CollationFactory is still not exposed, besides tests. The connection between UTF8String and CollationFactory is coming in the next PR.
    
    ### Why are the changes needed?
    
    Please refer to umbrella JIRA ticket for collation effort.
    
    ### Does this PR introduce _any_ user-facing change?
    
    At this point No, this is just prep for user facing changes.
    
    ### How was this patch tested?
    
    Unit tests for CollationFactory come with this PR. Tests are basic sanity tests, proper testing will come as we get E2E features.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44968 from dbatomic/utf8_collation_extension.
    
    Lead-authored-by: Aleksandar Tomic <al...@databricks.com>
    Co-authored-by: Aleksandar Tomic <15...@users.noreply.github.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 common/unsafe/pom.xml                              |   6 +
 .../spark/sql/catalyst/util/CollationFactory.java  | 175 +++++++++++++++++++++
 .../spark/unsafe/types/CollationFactorySuite.scala | 116 ++++++++++++++
 .../src/main/resources/error/error-classes.json    |   6 +
 .../scala/org/apache/spark/SparkException.scala    |  10 ++
 dev/deps/spark-deps-hadoop-3-hive-2.3              |   1 +
 docs/sql-error-conditions.md                       |   6 +
 pom.xml                                            |   6 +
 sql/core/pom.xml                                   |   4 +
 9 files changed, 330 insertions(+)

diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index 4d23c9149bb7..e9785ebb7ad4 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -47,6 +47,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.ibm.icu</groupId>
+      <artifactId>icu4j</artifactId>
+      <version>${icu4j.version}</version>
+    </dependency>
+
     <!--
       This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
       them will yield errors.
diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
new file mode 100644
index 000000000000..018fb6cbeb9f
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util;
+
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.function.ToLongFunction;
+
+import com.ibm.icu.util.ULocale;
+import com.ibm.icu.text.Collator;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Static entry point for collation aware string functions.
+ * Provides functionality to the UTF8String object which respects defined collation settings.
+ */
+public final class CollationFactory {
+  /**
+   * Entry encapsulating all information about a collation.
+   */
+  public static class Collation {
+    public final String collationName;
+    public final Collator collator;
+    public final Comparator<UTF8String> comparator;
+
+    /**
+     * Version of the collation. This is the version of the ICU library Collator.
+     * For non-ICU collations (e.g. UTF8 Binary) the version is set to "1.0".
+     * When using ICU Collator this version is exposed through collator.getVersion().
+     * Whenever the collation is updated, the version should be updated as well or kept
+     * for backwards compatibility.
+     */
+    public final String version;
+
+    /**
+     * Collation sensitive hash function. Output for two UTF8Strings will be the same if they are
+     * equal according to the collation.
+     */
+    public final ToLongFunction<UTF8String> hashFunction;
+
+    /**
+     * Potentially faster way than using comparator to compare two UTF8Strings for equality.
+     * Falls back to binary comparison if the collation is binary.
+     */
+    public final BiFunction<UTF8String, UTF8String, Boolean> equalsFunction;
+
+    /**
+     * Binary collation implies that UTF8Strings are considered equal only if they are
+     * byte for byte equal. All accent or case-insensitive collations are considered non-binary.
+     */
+    public final boolean isBinaryCollation;
+
+    public Collation(
+        String collationName,
+        Collator collator,
+        Comparator<UTF8String> comparator,
+        String version,
+        ToLongFunction<UTF8String> hashFunction,
+        boolean isBinaryCollation) {
+      this.collationName = collationName;
+      this.collator = collator;
+      this.comparator = comparator;
+      this.version = version;
+      this.hashFunction = hashFunction;
+      this.isBinaryCollation = isBinaryCollation;
+
+      if (isBinaryCollation) {
+        this.equalsFunction = UTF8String::equals;
+      } else {
+        this.equalsFunction = (s1, s2) -> this.comparator.compare(s1, s2) == 0;
+      }
+    }
+
+    /**
+     * Constructor with comparators that are inherited from the given collator.
+     */
+    public Collation(
+        String collationName, Collator collator, String version, boolean isBinaryCollation) {
+      this(
+        collationName,
+        collator,
+        (s1, s2) -> collator.compare(s1.toString(), s2.toString()),
+        version,
+        s -> (long)collator.getCollationKey(s.toString()).hashCode(),
+        isBinaryCollation);
+    }
+  }
+
+  private static final Collation[] collationTable = new Collation[4];
+  private static final HashMap<String, Integer> collationNameToIdMap = new HashMap<>();
+
+  static {
+    // Binary comparison. This is the default collation.
+    // No custom comparators will be used for this collation.
+    // Instead, we rely on byte for byte comparison.
+    collationTable[0] = new Collation(
+      "UCS_BASIC",
+      null,
+      UTF8String::compareTo,
+      "1.0",
+      s -> (long)s.hashCode(),
+      true);
+
+    // Case-insensitive UTF8 binary collation.
+    // TODO: Do in place comparisons instead of creating new strings.
+    collationTable[1] = new Collation(
+      "UCS_BASIC_LCASE",
+      null,
+      Comparator.comparing(UTF8String::toLowerCase),
+      "1.0",
+      (s) -> (long)s.toLowerCase().hashCode(),
+      false);
+
+    // UNICODE case sensitive comparison (ROOT locale, in ICU).
+    collationTable[2] = new Collation(
+      "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true);
+    collationTable[2].collator.setStrength(Collator.TERTIARY);
+
+
+    // UNICODE case-insensitive comparison (ROOT locale, in ICU + Secondary strength).
+    collationTable[3] = new Collation(
+      "UNICODE_CI", Collator.getInstance(ULocale.ROOT), "153.120.0.0", false);
+    collationTable[3].collator.setStrength(Collator.SECONDARY);
+
+    for (int i = 0; i < collationTable.length; i++) {
+      collationNameToIdMap.put(collationTable[i].collationName, i);
+    }
+  }
+
+  /**
+   * Returns the collation id for the given collation name.
+   */
+  public static int collationNameToId(String collationName) throws SparkException {
+    String normalizedName = collationName.toUpperCase();
+    if (collationNameToIdMap.containsKey(normalizedName)) {
+      return collationNameToIdMap.get(normalizedName);
+    } else {
+      Collation suggestion = Collections.min(List.of(collationTable), Comparator.comparingInt(
+        c -> UTF8String.fromString(c.collationName).levenshteinDistance(
+          UTF8String.fromString(normalizedName))));
+
+      Map<String, String> params = new HashMap<>();
+      params.put("collationName", collationName);
+      params.put("proposal", suggestion.collationName);
+
+      throw new SparkException(
+        "COLLATION_INVALID_NAME", SparkException.constructMessageParams(params), null);
+    }
+  }
+
+  public static Collation fetchCollation(int collationId) {
+    return collationTable[collationId];
+  }
+
+  public static Collation fetchCollation(String collationName) throws SparkException {
+    int collationId = collationNameToId(collationName);
+    return collationTable[collationId];
+  }
+}
diff --git a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
new file mode 100644
index 000000000000..c7f72a49c5ea
--- /dev/null
+++ b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.types
+
+import scala.jdk.CollectionConverters.MapHasAsScala
+
+import org.apache.spark.SparkException
+// scalastyle:off
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.matchers.must.Matchers
+
+import org.apache.spark.sql.catalyst.util.CollationFactory._
+import org.apache.spark.unsafe.types.UTF8String.{fromString => toUTF8}
+
+class CollationFactorySuite extends AnyFunSuite with Matchers { // scalastyle:ignore funsuite
+  test("collationId stability") {
+    val ucsBasic = fetchCollation(0)
+    assert(ucsBasic.collationName == "UCS_BASIC")
+    assert(ucsBasic.isBinaryCollation)
+
+    val ucsBasicLcase = fetchCollation(1)
+    assert(ucsBasicLcase.collationName == "UCS_BASIC_LCASE")
+    assert(!ucsBasicLcase.isBinaryCollation)
+
+    val unicode = fetchCollation(2)
+    assert(unicode.collationName == "UNICODE")
+    assert(unicode.isBinaryCollation);
+
+    val unicodeCi = fetchCollation(3)
+    assert(unicodeCi.collationName == "UNICODE_CI")
+    assert(!unicodeCi.isBinaryCollation)
+  }
+
+  test("fetch invalid collation name") {
+    val error = intercept[SparkException] {
+      fetchCollation("UCS_BASIS")
+    }
+
+    assert(error.getErrorClass === "COLLATION_INVALID_NAME")
+    assert(error.getMessageParameters.asScala ===
+      Map("proposal" -> "UCS_BASIC", "collationName" -> "UCS_BASIS"))
+  }
+
+  case class CollationTestCase[R](collationName: String, s1: String, s2: String, expectedResult: R)
+
+  test("collation aware equality and hash") {
+    val checks = Seq(
+      CollationTestCase("UCS_BASIC", "aaa", "aaa", true),
+      CollationTestCase("UCS_BASIC", "aaa", "AAA", false),
+      CollationTestCase("UCS_BASIC", "aaa", "bbb", false),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "aaa", true),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "AAA", true),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "AaA", true),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "AaA", true),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "aa", false),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "bbb", false),
+      CollationTestCase("UNICODE", "aaa", "aaa", true),
+      CollationTestCase("UNICODE", "aaa", "AAA", false),
+      CollationTestCase("UNICODE", "aaa", "bbb", false),
+      CollationTestCase("UNICODE_CI", "aaa", "aaa", true),
+      CollationTestCase("UNICODE_CI", "aaa", "AAA", true),
+      CollationTestCase("UNICODE_CI", "aaa", "bbb", false))
+
+    checks.foreach(testCase => {
+      val collation = fetchCollation(testCase.collationName)
+      assert(collation.equalsFunction(toUTF8(testCase.s1), toUTF8(testCase.s2)) ==
+        testCase.expectedResult)
+
+      val hash1 = collation.hashFunction.applyAsLong(toUTF8(testCase.s1))
+      val hash2 = collation.hashFunction.applyAsLong(toUTF8(testCase.s1))
+      assert(hash1 == hash2)
+    })
+  }
+
+  test("collation aware compare") {
+    val checks = Seq(
+      CollationTestCase("UCS_BASIC", "aaa", "aaa", 0),
+      CollationTestCase("UCS_BASIC", "aaa", "AAA", 1),
+      CollationTestCase("UCS_BASIC", "aaa", "bbb", -1),
+      CollationTestCase("UCS_BASIC", "aaa", "BBB", 1),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "aaa", 0),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "AAA", 0),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "AaA", 0),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "AaA", 0),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "aa", 1),
+      CollationTestCase("UCS_BASIC_LCASE", "aaa", "bbb", -1),
+      CollationTestCase("UNICODE", "aaa", "aaa", 0),
+      CollationTestCase("UNICODE", "aaa", "AAA", -1),
+      CollationTestCase("UNICODE", "aaa", "bbb", -1),
+      CollationTestCase("UNICODE", "aaa", "BBB", -1),
+      CollationTestCase("UNICODE_CI", "aaa", "aaa", 0),
+      CollationTestCase("UNICODE_CI", "aaa", "AAA", 0),
+      CollationTestCase("UNICODE_CI", "aaa", "bbb", -1))
+
+    checks.foreach(testCase => {
+      val collation = fetchCollation(testCase.collationName)
+      val result = collation.comparator.compare(toUTF8(testCase.s1), toUTF8(testCase.s2))
+      assert(Integer.signum(result) == testCase.expectedResult)
+    })
+  }
+}
diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json
index ef9e81c98e05..2a1c044c8d2a 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -463,6 +463,12 @@
     ],
     "sqlState" : "42704"
   },
+  "COLLATION_INVALID_NAME" : {
+    "message" : [
+      "The value <collationName> does not represent a correct collation name. Suggested valid collation name: [<proposal>]."
+    ],
+    "sqlState" : "42704"
+  },
   "COLLECTION_SIZE_LIMIT_EXCEEDED" : {
     "message" : [
       "Can't create array with <numberOfElements> elements which exceeding the array size limit <maxRoundedArrayLength>,"
diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index ebb6e772249b..899082e550f9 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -121,6 +121,16 @@ object SparkException {
       throw new SparkIllegalArgumentException(errorClass, messageParameters)
     }
   }
+
+  /**
+   * Utility method to construct message params from Java Map.
+   * @param messageParameters The Java Map.
+   * @return Scala collection that can be passed to SparkException constructor.
+   */
+  def constructMessageParams(
+      messageParameters: java.util.Map[String, String]): Map[String, String] = {
+    messageParameters.asScala.toMap
+  }
 }
 
 /**
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3
index e02733883642..dd8d74888c6a 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -99,6 +99,7 @@ hk2-locator/2.6.1//hk2-locator-2.6.1.jar
 hk2-utils/2.6.1//hk2-utils-2.6.1.jar
 httpclient/4.5.14//httpclient-4.5.14.jar
 httpcore/4.4.16//httpcore-4.4.16.jar
+icu4j/72.1//icu4j-72.1.jar
 ini4j/0.5.4//ini4j-0.5.4.jar
 istack-commons-runtime/3.0.8//istack-commons-runtime-3.0.8.jar
 ivy/2.5.1//ivy-2.5.1.jar
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 359d8645ef20..8dd409d82bbd 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -386,6 +386,12 @@ For more details see [CODEC_NOT_AVAILABLE](sql-error-conditions-codec-not-availa
 
 Cannot find a short name for the codec `<codecName>`.
 
+### COLLATION_INVALID_NAME
+
+[SQLSTATE: 42704](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+The value `<collationName>` does not represent a correct collation name. Suggested valid collation name: [`<proposal>`].
+
 ### [COLLECTION_SIZE_LIMIT_EXCEEDED](sql-error-conditions-collection-size-limit-exceeded-error-class.html)
 
 [SQLSTATE: 54000](sql-error-conditions-sqlstates.html#class-54-program-limit-exceeded)
diff --git a/pom.xml b/pom.xml
index 2fc14a4cdede..35452ba0d734 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,6 +226,7 @@
     <datasketches.version>5.0.1</datasketches.version>
     <netty.version>4.1.106.Final</netty.version>
     <netty-tcnative.version>2.0.61.Final</netty-tcnative.version>
+    <icu4j.version>72.1</icu4j.version>
     <!--
     If you are changing Arrow version specification, please check
     ./python/pyspark/sql/pandas/utils.py, and ./python/setup.py too.
@@ -2744,6 +2745,11 @@
         <artifactId>antlr4-runtime</artifactId>
         <version>${antlr4.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.ibm.icu</groupId>
+        <artifactId>icu4j</artifactId>
+        <version>${icu4j.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-crypto</artifactId>
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index c83104a6bd45..f2d888ceafe7 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -179,6 +179,10 @@
       <artifactId>jcc</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.ibm.icu</groupId>
+      <artifactId>icu4j</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.microsoft.sqlserver</groupId>
       <artifactId>mssql-jdbc</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org