You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/30 16:44:27 UTC

[spark] branch master updated: [SPARK-43986][SQL] Create error classes for HyperLogLog function call failures

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ab67f461987 [SPARK-43986][SQL] Create error classes for HyperLogLog function call failures
ab67f461987 is described below

commit ab67f4619873f21b5dcf7f67658afce7e1028657
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Fri Jun 30 19:44:14 2023 +0300

    [SPARK-43986][SQL] Create error classes for HyperLogLog function call failures
    
    ### What changes were proposed in this pull request?
    
    This PR creates error classes for HyperLogLog function call failures.
    
    ### Why are the changes needed?
    
    These replace previous Java exceptions or other cases, in order to improve the user experience and bring consistency with other parts of Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, error messages change slightly.
    
    ### How was this patch tested?
    
    This PR also adds SQL query test files for the HLL functions.
    
    Closes #41486 from dtenedor/hll-error-classes.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  15 +
 .../aggregate/datasketchesAggregates.scala         |  71 +++--
 .../expressions/datasketchesExpressions.scala      |  29 +-
 .../spark/sql/errors/QueryExecutionErrors.scala    |  26 ++
 .../sql-tests/analyzer-results/hll.sql.out         | 215 +++++++++++++
 .../src/test/resources/sql-tests/inputs/hll.sql    |  76 +++++
 .../test/resources/sql-tests/results/hll.sql.out   | 262 ++++++++++++++++
 .../apache/spark/sql/DataFrameAggregateSuite.scala | 338 ++++++++++++---------
 8 files changed, 850 insertions(+), 182 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json
index db6b9a97012..abe88db1267 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -782,6 +782,21 @@
       "The expression <sqlExpr> cannot be used as a grouping expression because its data type <dataType> is not an orderable data type."
     ]
   },
+  "HLL_INVALID_INPUT_SKETCH_BUFFER" : {
+    "message" : [
+      "Invalid call to <function>; only valid HLL sketch buffers are supported as inputs (such as those produced by the `hll_sketch_agg` function)."
+    ]
+  },
+  "HLL_INVALID_LG_K" : {
+    "message" : [
+      "Invalid call to <function>; the `lgConfigK` value must be between <min> and <max>, inclusive: <value>."
+    ]
+  },
+  "HLL_UNION_DIFFERENT_LG_K" : {
+    "message" : [
+      "Sketches have different `lgConfigK` values: <left> and <right>. Set the `allowDifferentLgConfigK` parameter to true to call <function> with different `lgConfigK` values."
+    ]
+  },
   "IDENTIFIER_TOO_MANY_NAME_PARTS" : {
     "message" : [
       "<identifier> is not a valid identifier as it has more than 2 name parts."
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
index 8b24efe12b4..17c69f798d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
@@ -17,23 +17,23 @@
 
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
-import org.apache.datasketches.SketchesArgumentException
 import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
 import org.apache.datasketches.memory.Memory
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
 import org.apache.spark.sql.catalyst.trees.BinaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, DataType, IntegerType, LongType, StringType, TypeCollection}
 import org.apache.spark.unsafe.types.UTF8String
 
 
 /**
- * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
- * count a probabilistic approximation of the number of unique values in
- * a given column, and outputs the binary representation of the HllSketch.
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to count a probabilistic
+ * approximation of the number of unique values in a given column, and outputs the binary
+ * representation of the HllSketch.
  *
- * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information.
  *
  * @param left child expression against which unique counting will occur
  * @param right the log-base-2 of K, where K is the number of buckets or slots for the sketch
@@ -41,7 +41,7 @@ import org.apache.spark.unsafe.types.UTF8String
 // scalastyle:off line.size.limit
 @ExpressionDescription(
   usage = """
-    _FUNC_(expr, lgConfigK) - Returns the HllSketch's updateable binary representation.
+    _FUNC_(expr, lgConfigK) - Returns the HllSketch's updatable binary representation.
       `lgConfigK` (optional) the log-base-2 of K, with K is the number of buckets or
       slots for the HllSketch. """,
   examples = """
@@ -117,9 +117,9 @@ case class HllSketchAgg(
   }
 
   /**
-   * Evaluate the input row and update the HllSketch instance with the row's value.
-   * The update function only supports a subset of Spark SQL types, and an
-   * UnsupportedOperationException will be thrown for unsupported types.
+   * Evaluate the input row and update the HllSketch instance with the row's value. The update
+   * function only supports a subset of Spark SQL types, and an exception will be thrown for
+   * unsupported types.
    *
    * @param sketch The HllSketch instance.
    * @param input  an input row
@@ -128,10 +128,10 @@ case class HllSketchAgg(
     val v = left.eval(input)
     if (v != null) {
       left.dataType match {
-        // Update implemented for a subset of types supported by HllSketch
-        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
-        // Leaving out support for Array types, as unique counting these aren't a common use case
-        // Leaving out support for floating point types (IE DoubleType) due to imprecision
+        // This is implemented for a subset of input data types.
+        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out.
+        // We leave out support for Array types, as unique counting these aren't a common use case.
+        // We leave out support for floating point types (such as DoubleType) due to imprecision.
         // TODO: implement support for decimal/datetime/interval types
         case IntegerType => sketch.update(v.asInstanceOf[Int])
         case LongType => sketch.update(v.asInstanceOf[Long])
@@ -189,21 +189,20 @@ object HllSketchAgg {
   private val minLgConfigK = 4
   private val maxLgConfigK = 21
 
-  // Replicate Datasketche's HllUtil's checkLgK implementation, as we can't reference it directly
+  // Replicate Datasketches' HllUtil's checkLgK implementation, as we can't reference it directly.
   def checkLgK(lgConfigK: Int): Unit = {
     if (lgConfigK < minLgConfigK || lgConfigK > maxLgConfigK) {
-      throw new SketchesArgumentException(
-        s"Log K must be between $minLgConfigK and $maxLgConfigK, inclusive: " + lgConfigK)
+      throw QueryExecutionErrors.hllInvalidLgK(function = "hll_sketch_agg",
+        min = minLgConfigK, max = maxLgConfigK, value = lgConfigK.toString)
     }
   }
 }
 
 /**
- * The HllUnionAgg function ingests and merges Datasketches HllSketch
- * instances previously produced by the HllSketchBinary function, and
- * outputs the merged HllSketch.
+ * The HllUnionAgg function ingests and merges Datasketches HllSketch instances previously produced
+ * by the HllSketchBinary function, and outputs the merged HllSketch.
  *
- * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information.
  *
  * @param left Child expression against which unique counting will occur
  * @param right Allow sketches with different lgConfigK values
@@ -286,20 +285,15 @@ case class HllUnionAgg(
   }
 
   /**
-   * Helper method to compare lgConfigKs and throw an exception if
-   * allowDifferentLgConfigK isn't true and configs don't match
+   * Helper method to compare lgConfigKs and throw an exception if `allowDifferentLgConfigK` isn't
+   * true and configs don't match.
    *
    * @param left An lgConfigK value
    * @param right An lgConfigK value
    */
   def compareLgConfigK(left: Int, right: Int): Unit = {
-    if (!allowDifferentLgConfigK) {
-      if (left != right) {
-        throw new UnsupportedOperationException(
-          s"Sketches have different lgConfigK values: $left and $right. " +
-            "Set allowDifferentLgConfigK to true to enable unions of " +
-            "different lgConfigK values.")
-      }
+    if (!allowDifferentLgConfigK && left != right) {
+      throw QueryExecutionErrors.hllUnionDifferentLgK(left, right, function = prettyName)
     }
   }
 
@@ -314,13 +308,18 @@ case class HllUnionAgg(
     if (v != null) {
       left.dataType match {
         case BinaryType =>
-          val sketch = HllSketch.wrap(Memory.wrap(v.asInstanceOf[Array[Byte]]))
-          val union = unionOption.getOrElse(new Union(sketch.getLgConfigK))
-          compareLgConfigK(union.getLgConfigK, sketch.getLgConfigK)
-          union.update(sketch)
-          Some(union)
-        case _ => throw new UnsupportedOperationException(
-          s"A Union instance can only be updated with a valid HllSketch byte array")
+          try {
+            val sketch = HllSketch.wrap(Memory.wrap(v.asInstanceOf[Array[Byte]]))
+            val union = unionOption.getOrElse(new Union(sketch.getLgConfigK))
+            compareLgConfigK(union.getLgConfigK, sketch.getLgConfigK)
+            union.update(sketch)
+            Some(union)
+          } catch {
+            case _: java.lang.Error =>
+              throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
+          }
+        case _ =>
+          throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
       }
     } else {
       unionOption
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala
index 2f1c865e12b..9e3fe52f534 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala
@@ -21,6 +21,7 @@ import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
 import org.apache.datasketches.memory.Memory
 
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, DataType, LongType}
 
 @ExpressionDescription(
@@ -51,7 +52,12 @@ case class HllSketchEstimate(child: Expression)
 
   override def nullSafeEval(input: Any): Any = {
     val buffer = input.asInstanceOf[Array[Byte]]
-    Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate)
+    try {
+      Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate)
+    } catch {
+      case _: java.lang.Error =>
+        throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
+    }
   }
 }
 
@@ -98,15 +104,22 @@ case class HllUnion(first: Expression, second: Expression, third: Expression)
   override def dataType: DataType = BinaryType
 
   override def nullSafeEval(value1: Any, value2: Any, value3: Any): Any = {
-    val sketch1 = HllSketch.heapify(Memory.wrap(value1.asInstanceOf[Array[Byte]]))
-    val sketch2 = HllSketch.heapify(Memory.wrap(value2.asInstanceOf[Array[Byte]]))
+    val sketch1 = try {
+      HllSketch.heapify(Memory.wrap(value1.asInstanceOf[Array[Byte]]))
+    } catch {
+      case _: java.lang.Error =>
+        throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
+    }
+    val sketch2 = try {
+      HllSketch.heapify(Memory.wrap(value2.asInstanceOf[Array[Byte]]))
+    } catch {
+      case _: java.lang.Error =>
+        throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
+    }
     val allowDifferentLgConfigK = value3.asInstanceOf[Boolean]
     if (!allowDifferentLgConfigK && sketch1.getLgConfigK != sketch2.getLgConfigK) {
-      throw new UnsupportedOperationException(
-        "Sketches have different lgConfigK values: " +
-        s"${sketch1.getLgConfigK} and ${sketch2.getLgConfigK}. " +
-        "Set allowDifferentLgConfigK to true to enable unions of " +
-        "different lgConfigK values.")
+      throw QueryExecutionErrors.hllUnionDifferentLgK(
+        sketch1.getLgConfigK, sketch2.getLgConfigK, function = prettyName)
     }
     val union = new Union(Math.min(sketch1.getLgConfigK, sketch2.getLgConfigK))
     union.update(sketch1)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index c31d01162c5..59b66bd4343 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2855,6 +2855,32 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
         "enumString" -> enumString))
   }
 
+  def hllInvalidLgK(function: String, min: Int, max: Int, value: String): Throwable = {
+    new SparkRuntimeException(
+      errorClass = "HLL_INVALID_LG_K",
+      messageParameters = Map(
+        "function" -> toSQLId(function),
+        "min" -> toSQLValue(min, IntegerType),
+        "max" -> toSQLValue(max, IntegerType),
+        "value" -> value))
+  }
+
+  def hllInvalidInputSketchBuffer(function: String): Throwable = {
+    new SparkRuntimeException(
+      errorClass = "HLL_INVALID_INPUT_SKETCH_BUFFER",
+      messageParameters = Map(
+        "function" -> toSQLId(function)))
+  }
+
+  def hllUnionDifferentLgK(left: Int, right: Int, function: String): Throwable = {
+    new SparkRuntimeException(
+      errorClass = "HLL_UNION_DIFFERENT_LG_K",
+      messageParameters = Map(
+        "left" -> toSQLValue(left, IntegerType),
+        "right" -> toSQLValue(right, IntegerType),
+        "function" -> toSQLId(function)))
+  }
+
   def mergeCardinalityViolationError(): SparkRuntimeException = {
     new SparkRuntimeException(
       errorClass = "MERGE_CARDINALITY_VIOLATION",
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out
new file mode 100644
index 00000000000..58391c0054c
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out
@@ -0,0 +1,215 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+DROP TABLE IF EXISTS t1
+-- !query analysis
+DropTable true, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t1
+
+
+-- !query
+CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col)
+-- !query analysis
+CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t1`, ErrorIfExists, [col]
+   +- SubqueryAlias tab
+      +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS result#xL]
++- SubqueryAlias spark_catalog.default.t1
+   +- Relation spark_catalog.default.t1[col#x] json
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
+FROM VALUES (50), (60), (60), (60), (75), (100) tab(col)
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS hll_sketch_estimate(hll_sketch_agg(col, 12))#xL]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col)
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS hll_sketch_estimate(hll_sketch_agg(col, 12))#xL]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_estimate(
+  hll_union(
+    hll_sketch_agg(col1),
+    hll_sketch_agg(col2)))
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2)
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_union(hll_sketch_agg(col1#x, 12, 0, 0), hll_sketch_agg(col2#x, 12, 0, 0), false)) AS hll_sketch_estimate(hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 12), false))#xL]
++- SubqueryAlias tab
+   +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_union_agg(sketch, true))
+    FROM (SELECT hll_sketch_agg(col) as sketch
+            FROM VALUES (1) AS tab(col)
+          UNION ALL
+          SELECT hll_sketch_agg(col, 20) as sketch
+            FROM VALUES (1) AS tab(col))
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_union_agg(sketch#x, true, 0, 0)) AS hll_sketch_estimate(hll_union_agg(sketch, true))#xL]
++- SubqueryAlias __auto_generated_subquery_name
+   +- Union false, false
+      :- Aggregate [hll_sketch_agg(col#x, 12, 0, 0) AS sketch#x]
+      :  +- SubqueryAlias tab
+      :     +- LocalRelation [col#x]
+      +- Aggregate [hll_sketch_agg(col#x, 20, 0, 0) AS sketch#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_agg(col)
+FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"col\"",
+    "inputType" : "\"ARRAY<INT>\"",
+    "paramIndex" : "1",
+    "requiredType" : "(\"INT\" or \"BIGINT\" or \"STRING\" or \"BINARY\")",
+    "sqlExpr" : "\"hll_sketch_agg(col, 12)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 26,
+    "fragment" : "hll_sketch_agg(col)"
+  } ]
+}
+
+
+-- !query
+SELECT hll_sketch_agg(col, 2)
+FROM VALUES (50), (60), (60) tab(col)
+-- !query analysis
+Aggregate [hll_sketch_agg(col#x, 2, 0, 0) AS hll_sketch_agg(col, 2)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_agg(col, 40)
+FROM VALUES (50), (60), (60) tab(col)
+-- !query analysis
+Aggregate [hll_sketch_agg(col#x, 40, 0, 0) AS hll_sketch_agg(col, 40)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_union(
+    hll_sketch_agg(col1, 12),
+    hll_sketch_agg(col2, 13))
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2)
+-- !query analysis
+Aggregate [hll_union(hll_sketch_agg(col1#x, 12, 0, 0), hll_sketch_agg(col2#x, 13, 0, 0), false) AS hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 13), false)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT hll_union_agg(sketch, false)
+FROM (SELECT hll_sketch_agg(col, 12) as sketch
+        FROM VALUES (1) AS tab(col)
+      UNION ALL
+      SELECT hll_sketch_agg(col, 20) as sketch
+        FROM VALUES (1) AS tab(col))
+-- !query analysis
+Aggregate [hll_union_agg(sketch#x, false, 0, 0) AS hll_union_agg(sketch, false)#x]
++- SubqueryAlias __auto_generated_subquery_name
+   +- Union false, false
+      :- Aggregate [hll_sketch_agg(col#x, 12, 0, 0) AS sketch#x]
+      :  +- SubqueryAlias tab
+      :     +- LocalRelation [col#x]
+      +- Aggregate [hll_sketch_agg(col#x, 20, 0, 0) AS sketch#x]
+         +- SubqueryAlias tab
+            +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_union(1, 2)
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"1\"",
+    "inputType" : "\"INT\"",
+    "paramIndex" : "1",
+    "requiredType" : "\"BINARY\"",
+    "sqlExpr" : "\"hll_union(1, 2, false)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "hll_union(1, 2)"
+  } ]
+}
+
+
+-- !query
+SELECT hll_sketch_estimate(CAST ('abc' AS BINARY))
+-- !query analysis
+Project [hll_sketch_estimate(cast(abc as binary)) AS hll_sketch_estimate(CAST(abc AS BINARY))#xL]
++- OneRowRelation
+
+
+-- !query
+SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY))
+-- !query analysis
+Project [hll_union(cast(abc as binary), cast(def as binary), false) AS hll_union(CAST(abc AS BINARY), CAST(def AS BINARY), false)#x]
++- OneRowRelation
+
+
+-- !query
+SELECT hll_union_agg(buffer, false)
+FROM (SELECT CAST('abc' AS BINARY) AS buffer)
+-- !query analysis
+Aggregate [hll_union_agg(buffer#x, false, 0, 0) AS hll_union_agg(buffer, false)#x]
++- SubqueryAlias __auto_generated_subquery_name
+   +- Project [cast(abc as binary) AS buffer#x]
+      +- OneRowRelation
+
+
+-- !query
+DROP TABLE IF EXISTS t1
+-- !query analysis
+DropTable true, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t1
diff --git a/sql/core/src/test/resources/sql-tests/inputs/hll.sql b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
new file mode 100644
index 00000000000..a0c29cb25a5
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
@@ -0,0 +1,76 @@
+-- Positive test cases
+-- Create a table with some testing data.
+DROP TABLE IF EXISTS t1;
+CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col);
+
+SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1;
+
+SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
+FROM VALUES (50), (60), (60), (60), (75), (100) tab(col);
+
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col);
+
+SELECT hll_sketch_estimate(
+  hll_union(
+    hll_sketch_agg(col1),
+    hll_sketch_agg(col2)))
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2);
+
+SELECT hll_sketch_estimate(hll_union_agg(sketch, true))
+    FROM (SELECT hll_sketch_agg(col) as sketch
+            FROM VALUES (1) AS tab(col)
+          UNION ALL
+          SELECT hll_sketch_agg(col, 20) as sketch
+            FROM VALUES (1) AS tab(col));
+
+-- Negative test cases
+SELECT hll_sketch_agg(col)
+FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col);
+
+SELECT hll_sketch_agg(col, 2)
+FROM VALUES (50), (60), (60) tab(col);
+
+SELECT hll_sketch_agg(col, 40)
+FROM VALUES (50), (60), (60) tab(col);
+
+SELECT hll_union(
+    hll_sketch_agg(col1, 12),
+    hll_sketch_agg(col2, 13))
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2);
+
+SELECT hll_union_agg(sketch, false)
+FROM (SELECT hll_sketch_agg(col, 12) as sketch
+        FROM VALUES (1) AS tab(col)
+      UNION ALL
+      SELECT hll_sketch_agg(col, 20) as sketch
+        FROM VALUES (1) AS tab(col));
+
+SELECT hll_union(1, 2)
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2);
+
+-- The HLL functions receive invalid buffers as inputs.
+SELECT hll_sketch_estimate(CAST ('abc' AS BINARY));
+
+SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY));
+
+SELECT hll_union_agg(buffer, false)
+FROM (SELECT CAST('abc' AS BINARY) AS buffer);
+
+-- Clean up
+DROP TABLE IF EXISTS t1;
diff --git a/sql/core/src/test/resources/sql-tests/results/hll.sql.out b/sql/core/src/test/resources/sql-tests/results/hll.sql.out
new file mode 100644
index 00000000000..c8a2e9a2faf
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/hll.sql.out
@@ -0,0 +1,262 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+DROP TABLE IF EXISTS t1
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1
+-- !query schema
+struct<result:bigint>
+-- !query output
+5
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
+FROM VALUES (50), (60), (60), (60), (75), (100) tab(col)
+-- !query schema
+struct<hll_sketch_estimate(hll_sketch_agg(col, 12)):bigint>
+-- !query output
+4
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col)
+-- !query schema
+struct<hll_sketch_estimate(hll_sketch_agg(col, 12)):bigint>
+-- !query output
+3
+
+
+-- !query
+SELECT hll_sketch_estimate(
+  hll_union(
+    hll_sketch_agg(col1),
+    hll_sketch_agg(col2)))
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2)
+-- !query schema
+struct<hll_sketch_estimate(hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 12), false)):bigint>
+-- !query output
+6
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_union_agg(sketch, true))
+    FROM (SELECT hll_sketch_agg(col) as sketch
+            FROM VALUES (1) AS tab(col)
+          UNION ALL
+          SELECT hll_sketch_agg(col, 20) as sketch
+            FROM VALUES (1) AS tab(col))
+-- !query schema
+struct<hll_sketch_estimate(hll_union_agg(sketch, true)):bigint>
+-- !query output
+1
+
+
+-- !query
+SELECT hll_sketch_agg(col)
+FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"col\"",
+    "inputType" : "\"ARRAY<INT>\"",
+    "paramIndex" : "1",
+    "requiredType" : "(\"INT\" or \"BIGINT\" or \"STRING\" or \"BINARY\")",
+    "sqlExpr" : "\"hll_sketch_agg(col, 12)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 26,
+    "fragment" : "hll_sketch_agg(col)"
+  } ]
+}
+
+
+-- !query
+SELECT hll_sketch_agg(col, 2)
+FROM VALUES (50), (60), (60) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "HLL_INVALID_LG_K",
+  "messageParameters" : {
+    "function" : "`hll_sketch_agg`",
+    "max" : "21",
+    "min" : "4",
+    "value" : "2"
+  }
+}
+
+
+-- !query
+SELECT hll_sketch_agg(col, 40)
+FROM VALUES (50), (60), (60) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "HLL_INVALID_LG_K",
+  "messageParameters" : {
+    "function" : "`hll_sketch_agg`",
+    "max" : "21",
+    "min" : "4",
+    "value" : "40"
+  }
+}
+
+
+-- !query
+SELECT hll_union(
+    hll_sketch_agg(col1, 12),
+    hll_sketch_agg(col2, 13))
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "HLL_UNION_DIFFERENT_LG_K",
+  "messageParameters" : {
+    "function" : "`hll_union`",
+    "left" : "12",
+    "right" : "13"
+  }
+}
+
+
+-- !query
+SELECT hll_union_agg(sketch, false)
+FROM (SELECT hll_sketch_agg(col, 12) as sketch
+        FROM VALUES (1) AS tab(col)
+      UNION ALL
+      SELECT hll_sketch_agg(col, 20) as sketch
+        FROM VALUES (1) AS tab(col))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "HLL_UNION_DIFFERENT_LG_K",
+  "messageParameters" : {
+    "function" : "`hll_union_agg`",
+    "left" : "12",
+    "right" : "20"
+  }
+}
+
+
+-- !query
+SELECT hll_union(1, 2)
+  FROM VALUES
+    (1, 4),
+    (1, 4),
+    (2, 5),
+    (2, 5),
+    (3, 6) AS tab(col1, col2)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"1\"",
+    "inputType" : "\"INT\"",
+    "paramIndex" : "1",
+    "requiredType" : "\"BINARY\"",
+    "sqlExpr" : "\"hll_union(1, 2, false)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "hll_union(1, 2)"
+  } ]
+}
+
+
+-- !query
+SELECT hll_sketch_estimate(CAST ('abc' AS BINARY))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER",
+  "messageParameters" : {
+    "function" : "`hll_sketch_estimate`"
+  }
+}
+
+
+-- !query
+SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER",
+  "messageParameters" : {
+    "function" : "`hll_union`"
+  }
+}
+
+
+-- !query
+SELECT hll_union_agg(buffer, false)
+FROM (SELECT CAST('abc' AS BINARY) AS buffer)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+  "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER",
+  "messageParameters" : {
+    "function" : "`hll_union_agg`"
+  }
+}
+
+
+-- !query
+DROP TABLE IF EXISTS t1
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 577d86c2d9a..b6e1f5af011 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -23,7 +23,7 @@ import scala.util.Random
 
 import org.scalatest.matchers.must.Matchers.the
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
@@ -1855,149 +1855,211 @@ class DataFrameAggregateSuite extends QueryTest
     df2.createOrReplaceTempView("df2")
 
     // validate that the functions error out when lgConfigK < 4 or > 24
-    val error0 = intercept[SparkException] {
-      val res = df1.groupBy("id")
-        .agg(
-          hll_sketch_agg("value", 1).as("hllsketch")
-        )
-      checkAnswer(res, Nil)
-    }
-    assert(error0.toString contains "SketchesArgumentException")
-
-    val error1 = intercept[SparkException] {
-      val res = df1.groupBy("id")
-        .agg(
-          hll_sketch_agg("value", 25).as("hllsketch")
-        )
-      checkAnswer(res, Nil)
-    }
-    assert(error1.toString contains "SketchesArgumentException")
+    checkError(
+      exception = intercept[SparkException] {
+        val res = df1.groupBy("id")
+          .agg(
+            hll_sketch_agg("value", 1).as("hllsketch")
+          )
+        checkAnswer(res, Nil)
+      }.getCause.asInstanceOf[SparkThrowable],
+      errorClass = "HLL_INVALID_LG_K",
+      parameters = Map(
+        "function" -> "`hll_sketch_agg`",
+        "min" -> "4",
+        "max" -> "21",
+        "value" -> "1"
+      ))
+
+    checkError(
+      exception = intercept[SparkException] {
+        val res = df1.groupBy("id")
+          .agg(
+            hll_sketch_agg("value", 25).as("hllsketch")
+          )
+        checkAnswer(res, Nil)
+      }.getCause.asInstanceOf[SparkThrowable],
+      errorClass = "HLL_INVALID_LG_K",
+      parameters = Map(
+        "function" -> "`hll_sketch_agg`",
+        "min" -> "4",
+        "max" -> "21",
+        "value" -> "25"
+      ))
 
     // validate that unions error out by default for different lgConfigK sketches
-    val error2 = intercept[SparkException] {
-      val i1 = df1.groupBy("id")
-        .agg(
-          hll_sketch_agg("value").as("hllsketch_left")
-        )
-      val i2 = df2.groupBy("id")
-        .agg(
-          hll_sketch_agg("value", 20).as("hllsketch_right")
-        )
-      val res = i1.join(i2).withColumn("union", hll_union("hllsketch_left", "hllsketch_right"))
-      checkAnswer(res, Nil)
-    }
-    assert(error2.toString contains "UnsupportedOperationException")
-
-    val error3 = intercept[SparkException] {
-      val i1 = df1.groupBy("id")
-        .agg(
-          hll_sketch_agg("value").as("hllsketch")
-        )
-      val i2 = df2.groupBy("id")
-        .agg(
-          hll_sketch_agg("value", 20).as("hllsketch")
-        )
-      val res = i1.union(i2).groupBy("id")
-        .agg(
-          hll_union_agg("hllsketch")
-        )
-      checkAnswer(res, Nil)
-    }
-    assert(error3.toString contains "UnsupportedOperationException")
+    checkError(
+      exception = intercept[SparkException] {
+        val i1 = df1.groupBy("id")
+          .agg(
+            hll_sketch_agg("value").as("hllsketch_left")
+          )
+        val i2 = df2.groupBy("id")
+          .agg(
+            hll_sketch_agg("value", 20).as("hllsketch_right")
+          )
+        val res = i1.join(i2).withColumn("union", hll_union("hllsketch_left", "hllsketch_right"))
+        checkAnswer(res, Nil)
+      }.getCause.asInstanceOf[SparkThrowable],
+      errorClass = "HLL_UNION_DIFFERENT_LG_K",
+      parameters = Map(
+        "left" -> "12",
+        "right" -> "20",
+        "function" -> "`hll_union`"
+      ))
+
+    checkError(
+      exception = intercept[SparkException] {
+        val i1 = df1.groupBy("id")
+          .agg(
+            hll_sketch_agg("value").as("hllsketch")
+          )
+        val i2 = df2.groupBy("id")
+          .agg(
+            hll_sketch_agg("value", 20).as("hllsketch")
+          )
+        val res = i1.union(i2).groupBy("id")
+          .agg(
+            hll_union_agg("hllsketch")
+          )
+        checkAnswer(res, Nil)
+      }.getCause.asInstanceOf[SparkThrowable],
+      errorClass = "HLL_UNION_DIFFERENT_LG_K",
+      parameters = Map(
+        "left" -> "12",
+        "right" -> "20",
+        "function" -> "`hll_union_agg`"
+      ))
 
     // validate that the functions error out when provided unexpected types
-    val error4 = intercept[AnalysisException] {
-      val res = sql(
-        """
-          |select
-          | id,
-          | hll_sketch_agg(value, 'text')
-          |from
-          | df1
-          |group by 1
-          |""".stripMargin)
-      checkAnswer(res, Nil)
-    }
-    assert(error4.toString contains "UNEXPECTED_INPUT_TYPE")
-
-    val error5 = intercept[AnalysisException] {
-      val res = sql(
-        """with sketch_cte as (
-          |select
-          | id,
-          | hll_sketch_agg(value) as sketch
-          |from
-          | df1
-          |group by 1
-          |)
-          |
-          |select hll_union_agg(sketch, 'Hll_4') from sketch_cte
-          |""".stripMargin)
-      checkAnswer(res, Nil)
-    }
-    assert(error5.toString contains "UNEXPECTED_INPUT_TYPE")
+    checkError(
+      exception = intercept[AnalysisException] {
+        val res = sql(
+          """
+            |select
+            | id,
+            | hll_sketch_agg(value, 'text')
+            |from
+            | df1
+            |group by 1
+            |""".stripMargin)
+        checkAnswer(res, Nil)
+      },
+      errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+      parameters = Map(
+        "sqlExpr" -> "\"hll_sketch_agg(value, text)\"",
+        "paramIndex" -> "2",
+        "inputSql" -> "\"text\"",
+        "inputType" -> "\"STRING\"",
+        "requiredType" -> "\"INT\""
+      ),
+      context = ExpectedContext(
+        fragment = "hll_sketch_agg(value, 'text')",
+        start = 14,
+        stop = 42))
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        val res = sql(
+          """with sketch_cte as (
+            |select
+            | id,
+            | hll_sketch_agg(value) as sketch
+            |from
+            | df1
+            |group by 1
+            |)
+            |
+            |select hll_union_agg(sketch, 'Hll_4') from sketch_cte
+            |""".stripMargin)
+        checkAnswer(res, Nil)
+      },
+      errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+      parameters = Map(
+        "sqlExpr" -> "\"hll_union_agg(sketch, Hll_4)\"",
+        "paramIndex" -> "2",
+        "inputSql" -> "\"Hll_4\"",
+        "inputType" -> "\"STRING\"",
+        "requiredType" -> "\"BOOLEAN\""
+      ),
+      context = ExpectedContext(
+        fragment = "hll_union_agg(sketch, 'Hll_4')",
+        start = 97,
+        stop = 126))
 
     // validate that unions error out by default for different lgConfigK sketches
-    val error6 = intercept[SparkException] {
-      val res = sql(
-        """with cte1 as (
-          |select
-          | id,
-          | hll_sketch_agg(value) as sketch
-          |from
-          | df1
-          |group by 1
-          |),
-          |
-          |cte2 as (
-          |select
-          | id,
-          | hll_sketch_agg(value, 20) as sketch
-          |from
-          | df2
-          |group by 1
-          |)
-          |
-          |select
-          | cte1.id,
-          | hll_union(cte1.sketch, cte2.sketch) as sketch
-          |from
-          | cte1 join cte2 on cte1.id = cte2.id
-          |""".stripMargin)
-      checkAnswer(res, Nil)
-    }
-    assert(error6.toString contains "UnsupportedOperationException")
-
-    val error7 = intercept[SparkException] {
-      val res = sql(
-        """with cte1 as (
-          |select
-          | id,
-          | hll_sketch_agg(value) as sketch
-          |from
-          | df1
-          |group by 1
-          |),
-          |
-          |cte2 as (
-          |select
-          | id,
-          | hll_sketch_agg(value, 20) as sketch
-          |from
-          | df2
-          |group by 1
-          |)
-          |
-          |select
-          | id,
-          | hll_union_agg(sketch) as sketch
-          |from
-          | (select * from cte1 union all select * from cte2)
-          |group by 1
-          |""".stripMargin)
-      checkAnswer(res, Nil)
-    }
-    assert(error7.toString contains "UnsupportedOperationException")
+    checkError(
+      exception = intercept[SparkException] {
+        val res = sql(
+          """with cte1 as (
+            |select
+            | id,
+            | hll_sketch_agg(value) as sketch
+            |from
+            | df1
+            |group by 1
+            |),
+            |
+            |cte2 as (
+            |select
+            | id,
+            | hll_sketch_agg(value, 20) as sketch
+            |from
+            | df2
+            |group by 1
+            |)
+            |
+            |select
+            | cte1.id,
+            | hll_union(cte1.sketch, cte2.sketch) as sketch
+            |from
+            | cte1 join cte2 on cte1.id = cte2.id
+            |""".stripMargin)
+        checkAnswer(res, Nil)
+      }.getCause.asInstanceOf[SparkThrowable],
+      errorClass = "HLL_UNION_DIFFERENT_LG_K",
+      parameters = Map(
+        "left" -> "12",
+        "right" -> "20",
+        "function" -> "`hll_union`"
+      ))
+
+    checkError(
+      exception = intercept[SparkException] {
+        val res = sql(
+          """with cte1 as (
+            |select
+            | id,
+            | hll_sketch_agg(value) as sketch
+            |from
+            | df1
+            |group by 1
+            |),
+            |
+            |cte2 as (
+            |select
+            | id,
+            | hll_sketch_agg(value, 20) as sketch
+            |from
+            | df2
+            |group by 1
+            |)
+            |
+            |select
+            | id,
+            | hll_union_agg(sketch) as sketch
+            |from
+            | (select * from cte1 union all select * from cte2)
+            |group by 1
+            |""".stripMargin)
+        checkAnswer(res, Nil)
+      }.getCause.asInstanceOf[SparkThrowable],
+      errorClass = "HLL_UNION_DIFFERENT_LG_K",
+      parameters = Map(
+        "left" -> "12",
+        "right" -> "20",
+        "function" -> "`hll_union_agg`"
+      ))
   }
 
   test("SPARK-43876: Enable fast hashmap for distinct queries") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org