You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/30 16:44:27 UTC
[spark] branch master updated: [SPARK-43986][SQL] Create error classes for HyperLogLog function call failures
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ab67f461987 [SPARK-43986][SQL] Create error classes for HyperLogLog function call failures
ab67f461987 is described below
commit ab67f4619873f21b5dcf7f67658afce7e1028657
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Fri Jun 30 19:44:14 2023 +0300
[SPARK-43986][SQL] Create error classes for HyperLogLog function call failures
### What changes were proposed in this pull request?
This PR creates error classes for HyperLogLog function call failures.
### Why are the changes needed?
These replace previous Java exceptions or other cases, in order to improve the user experience and bring consistency with other parts of Spark.
### Does this PR introduce _any_ user-facing change?
Yes, error messages change slightly.
### How was this patch tested?
This PR also adds SQL query test files for the HLL functions.
Closes #41486 from dtenedor/hll-error-classes.
Authored-by: Daniel Tenedorio <da...@databricks.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../src/main/resources/error/error-classes.json | 15 +
.../aggregate/datasketchesAggregates.scala | 71 +++--
.../expressions/datasketchesExpressions.scala | 29 +-
.../spark/sql/errors/QueryExecutionErrors.scala | 26 ++
.../sql-tests/analyzer-results/hll.sql.out | 215 +++++++++++++
.../src/test/resources/sql-tests/inputs/hll.sql | 76 +++++
.../test/resources/sql-tests/results/hll.sql.out | 262 ++++++++++++++++
.../apache/spark/sql/DataFrameAggregateSuite.scala | 338 ++++++++++++---------
8 files changed, 850 insertions(+), 182 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json
index db6b9a97012..abe88db1267 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -782,6 +782,21 @@
"The expression <sqlExpr> cannot be used as a grouping expression because its data type <dataType> is not an orderable data type."
]
},
+ "HLL_INVALID_INPUT_SKETCH_BUFFER" : {
+ "message" : [
+ "Invalid call to <function>; only valid HLL sketch buffers are supported as inputs (such as those produced by the `hll_sketch_agg` function)."
+ ]
+ },
+ "HLL_INVALID_LG_K" : {
+ "message" : [
+ "Invalid call to <function>; the `lgConfigK` value must be between <min> and <max>, inclusive: <value>."
+ ]
+ },
+ "HLL_UNION_DIFFERENT_LG_K" : {
+ "message" : [
+ "Sketches have different `lgConfigK` values: <left> and <right>. Set the `allowDifferentLgConfigK` parameter to true to call <function> with different `lgConfigK` values."
+ ]
+ },
"IDENTIFIER_TOO_MANY_NAME_PARTS" : {
"message" : [
"<identifier> is not a valid identifier as it has more than 2 name parts."
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
index 8b24efe12b4..17c69f798d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
@@ -17,23 +17,23 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
-import org.apache.datasketches.SketchesArgumentException
import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
import org.apache.datasketches.memory.Memory
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
import org.apache.spark.sql.catalyst.trees.BinaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, DataType, IntegerType, LongType, StringType, TypeCollection}
import org.apache.spark.unsafe.types.UTF8String
/**
- * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
- * count a probabilistic approximation of the number of unique values in
- * a given column, and outputs the binary representation of the HllSketch.
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to count a probabilistic
+ * approximation of the number of unique values in a given column, and outputs the binary
+ * representation of the HllSketch.
*
- * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information.
*
* @param left child expression against which unique counting will occur
* @param right the log-base-2 of K, where K is the number of buckets or slots for the sketch
@@ -41,7 +41,7 @@ import org.apache.spark.unsafe.types.UTF8String
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
- _FUNC_(expr, lgConfigK) - Returns the HllSketch's updateable binary representation.
+ _FUNC_(expr, lgConfigK) - Returns the HllSketch's updatable binary representation.
`lgConfigK` (optional) the log-base-2 of K, with K is the number of buckets or
slots for the HllSketch. """,
examples = """
@@ -117,9 +117,9 @@ case class HllSketchAgg(
}
/**
- * Evaluate the input row and update the HllSketch instance with the row's value.
- * The update function only supports a subset of Spark SQL types, and an
- * UnsupportedOperationException will be thrown for unsupported types.
+ * Evaluate the input row and update the HllSketch instance with the row's value. The update
+ * function only supports a subset of Spark SQL types, and an exception will be thrown for
+ * unsupported types.
*
* @param sketch The HllSketch instance.
* @param input an input row
@@ -128,10 +128,10 @@ case class HllSketchAgg(
val v = left.eval(input)
if (v != null) {
left.dataType match {
- // Update implemented for a subset of types supported by HllSketch
- // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
- // Leaving out support for Array types, as unique counting these aren't a common use case
- // Leaving out support for floating point types (IE DoubleType) due to imprecision
+ // This is implemented for a subset of input data types.
+ // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out.
+ // We leave out support for Array types, as unique counting these aren't a common use case.
+ // We leave out support for floating point types (such as DoubleType) due to imprecision.
// TODO: implement support for decimal/datetime/interval types
case IntegerType => sketch.update(v.asInstanceOf[Int])
case LongType => sketch.update(v.asInstanceOf[Long])
@@ -189,21 +189,20 @@ object HllSketchAgg {
private val minLgConfigK = 4
private val maxLgConfigK = 21
- // Replicate Datasketche's HllUtil's checkLgK implementation, as we can't reference it directly
+ // Replicate Datasketches' HllUtil's checkLgK implementation, as we can't reference it directly.
def checkLgK(lgConfigK: Int): Unit = {
if (lgConfigK < minLgConfigK || lgConfigK > maxLgConfigK) {
- throw new SketchesArgumentException(
- s"Log K must be between $minLgConfigK and $maxLgConfigK, inclusive: " + lgConfigK)
+ throw QueryExecutionErrors.hllInvalidLgK(function = "hll_sketch_agg",
+ min = minLgConfigK, max = maxLgConfigK, value = lgConfigK.toString)
}
}
}
/**
- * The HllUnionAgg function ingests and merges Datasketches HllSketch
- * instances previously produced by the HllSketchBinary function, and
- * outputs the merged HllSketch.
+ * The HllUnionAgg function ingests and merges Datasketches HllSketch instances previously produced
+ * by the HllSketchBinary function, and outputs the merged HllSketch.
*
- * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information.
*
* @param left Child expression against which unique counting will occur
* @param right Allow sketches with different lgConfigK values
@@ -286,20 +285,15 @@ case class HllUnionAgg(
}
/**
- * Helper method to compare lgConfigKs and throw an exception if
- * allowDifferentLgConfigK isn't true and configs don't match
+ * Helper method to compare lgConfigKs and throw an exception if `allowDifferentLgConfigK` isn't
+ * true and configs don't match.
*
* @param left An lgConfigK value
* @param right An lgConfigK value
*/
def compareLgConfigK(left: Int, right: Int): Unit = {
- if (!allowDifferentLgConfigK) {
- if (left != right) {
- throw new UnsupportedOperationException(
- s"Sketches have different lgConfigK values: $left and $right. " +
- "Set allowDifferentLgConfigK to true to enable unions of " +
- "different lgConfigK values.")
- }
+ if (!allowDifferentLgConfigK && left != right) {
+ throw QueryExecutionErrors.hllUnionDifferentLgK(left, right, function = prettyName)
}
}
@@ -314,13 +308,18 @@ case class HllUnionAgg(
if (v != null) {
left.dataType match {
case BinaryType =>
- val sketch = HllSketch.wrap(Memory.wrap(v.asInstanceOf[Array[Byte]]))
- val union = unionOption.getOrElse(new Union(sketch.getLgConfigK))
- compareLgConfigK(union.getLgConfigK, sketch.getLgConfigK)
- union.update(sketch)
- Some(union)
- case _ => throw new UnsupportedOperationException(
- s"A Union instance can only be updated with a valid HllSketch byte array")
+ try {
+ val sketch = HllSketch.wrap(Memory.wrap(v.asInstanceOf[Array[Byte]]))
+ val union = unionOption.getOrElse(new Union(sketch.getLgConfigK))
+ compareLgConfigK(union.getLgConfigK, sketch.getLgConfigK)
+ union.update(sketch)
+ Some(union)
+ } catch {
+ case _: java.lang.Error =>
+ throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
+ }
+ case _ =>
+ throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
}
} else {
unionOption
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala
index 2f1c865e12b..9e3fe52f534 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala
@@ -21,6 +21,7 @@ import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
import org.apache.datasketches.memory.Memory
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, DataType, LongType}
@ExpressionDescription(
@@ -51,7 +52,12 @@ case class HllSketchEstimate(child: Expression)
override def nullSafeEval(input: Any): Any = {
val buffer = input.asInstanceOf[Array[Byte]]
- Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate)
+ try {
+ Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate)
+ } catch {
+ case _: java.lang.Error =>
+ throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
+ }
}
}
@@ -98,15 +104,22 @@ case class HllUnion(first: Expression, second: Expression, third: Expression)
override def dataType: DataType = BinaryType
override def nullSafeEval(value1: Any, value2: Any, value3: Any): Any = {
- val sketch1 = HllSketch.heapify(Memory.wrap(value1.asInstanceOf[Array[Byte]]))
- val sketch2 = HllSketch.heapify(Memory.wrap(value2.asInstanceOf[Array[Byte]]))
+ val sketch1 = try {
+ HllSketch.heapify(Memory.wrap(value1.asInstanceOf[Array[Byte]]))
+ } catch {
+ case _: java.lang.Error =>
+ throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
+ }
+ val sketch2 = try {
+ HllSketch.heapify(Memory.wrap(value2.asInstanceOf[Array[Byte]]))
+ } catch {
+ case _: java.lang.Error =>
+ throw QueryExecutionErrors.hllInvalidInputSketchBuffer(prettyName)
+ }
val allowDifferentLgConfigK = value3.asInstanceOf[Boolean]
if (!allowDifferentLgConfigK && sketch1.getLgConfigK != sketch2.getLgConfigK) {
- throw new UnsupportedOperationException(
- "Sketches have different lgConfigK values: " +
- s"${sketch1.getLgConfigK} and ${sketch2.getLgConfigK}. " +
- "Set allowDifferentLgConfigK to true to enable unions of " +
- "different lgConfigK values.")
+ throw QueryExecutionErrors.hllUnionDifferentLgK(
+ sketch1.getLgConfigK, sketch2.getLgConfigK, function = prettyName)
}
val union = new Union(Math.min(sketch1.getLgConfigK, sketch2.getLgConfigK))
union.update(sketch1)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index c31d01162c5..59b66bd4343 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2855,6 +2855,32 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
"enumString" -> enumString))
}
+ def hllInvalidLgK(function: String, min: Int, max: Int, value: String): Throwable = {
+ new SparkRuntimeException(
+ errorClass = "HLL_INVALID_LG_K",
+ messageParameters = Map(
+ "function" -> toSQLId(function),
+ "min" -> toSQLValue(min, IntegerType),
+ "max" -> toSQLValue(max, IntegerType),
+ "value" -> value))
+ }
+
+ def hllInvalidInputSketchBuffer(function: String): Throwable = {
+ new SparkRuntimeException(
+ errorClass = "HLL_INVALID_INPUT_SKETCH_BUFFER",
+ messageParameters = Map(
+ "function" -> toSQLId(function)))
+ }
+
+ def hllUnionDifferentLgK(left: Int, right: Int, function: String): Throwable = {
+ new SparkRuntimeException(
+ errorClass = "HLL_UNION_DIFFERENT_LG_K",
+ messageParameters = Map(
+ "left" -> toSQLValue(left, IntegerType),
+ "right" -> toSQLValue(right, IntegerType),
+ "function" -> toSQLId(function)))
+ }
+
def mergeCardinalityViolationError(): SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "MERGE_CARDINALITY_VIOLATION",
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out
new file mode 100644
index 00000000000..58391c0054c
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out
@@ -0,0 +1,215 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+DROP TABLE IF EXISTS t1
+-- !query analysis
+DropTable true, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t1
+
+
+-- !query
+CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col)
+-- !query analysis
+CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t1`, ErrorIfExists, [col]
+ +- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS result#xL]
++- SubqueryAlias spark_catalog.default.t1
+ +- Relation spark_catalog.default.t1[col#x] json
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
+FROM VALUES (50), (60), (60), (60), (75), (100) tab(col)
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS hll_sketch_estimate(hll_sketch_agg(col, 12))#xL]
++- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col)
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS hll_sketch_estimate(hll_sketch_agg(col, 12))#xL]
++- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_estimate(
+ hll_union(
+ hll_sketch_agg(col1),
+ hll_sketch_agg(col2)))
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2)
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_union(hll_sketch_agg(col1#x, 12, 0, 0), hll_sketch_agg(col2#x, 12, 0, 0), false)) AS hll_sketch_estimate(hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 12), false))#xL]
++- SubqueryAlias tab
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_union_agg(sketch, true))
+ FROM (SELECT hll_sketch_agg(col) as sketch
+ FROM VALUES (1) AS tab(col)
+ UNION ALL
+ SELECT hll_sketch_agg(col, 20) as sketch
+ FROM VALUES (1) AS tab(col))
+-- !query analysis
+Aggregate [hll_sketch_estimate(hll_union_agg(sketch#x, true, 0, 0)) AS hll_sketch_estimate(hll_union_agg(sketch, true))#xL]
++- SubqueryAlias __auto_generated_subquery_name
+ +- Union false, false
+ :- Aggregate [hll_sketch_agg(col#x, 12, 0, 0) AS sketch#x]
+ : +- SubqueryAlias tab
+ : +- LocalRelation [col#x]
+ +- Aggregate [hll_sketch_agg(col#x, 20, 0, 0) AS sketch#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_agg(col)
+FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"col\"",
+ "inputType" : "\"ARRAY<INT>\"",
+ "paramIndex" : "1",
+ "requiredType" : "(\"INT\" or \"BIGINT\" or \"STRING\" or \"BINARY\")",
+ "sqlExpr" : "\"hll_sketch_agg(col, 12)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 26,
+ "fragment" : "hll_sketch_agg(col)"
+ } ]
+}
+
+
+-- !query
+SELECT hll_sketch_agg(col, 2)
+FROM VALUES (50), (60), (60) tab(col)
+-- !query analysis
+Aggregate [hll_sketch_agg(col#x, 2, 0, 0) AS hll_sketch_agg(col, 2)#x]
++- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_agg(col, 40)
+FROM VALUES (50), (60), (60) tab(col)
+-- !query analysis
+Aggregate [hll_sketch_agg(col#x, 40, 0, 0) AS hll_sketch_agg(col, 40)#x]
++- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_union(
+ hll_sketch_agg(col1, 12),
+ hll_sketch_agg(col2, 13))
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2)
+-- !query analysis
+Aggregate [hll_union(hll_sketch_agg(col1#x, 12, 0, 0), hll_sketch_agg(col2#x, 13, 0, 0), false) AS hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 13), false)#x]
++- SubqueryAlias tab
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT hll_union_agg(sketch, false)
+FROM (SELECT hll_sketch_agg(col, 12) as sketch
+ FROM VALUES (1) AS tab(col)
+ UNION ALL
+ SELECT hll_sketch_agg(col, 20) as sketch
+ FROM VALUES (1) AS tab(col))
+-- !query analysis
+Aggregate [hll_union_agg(sketch#x, false, 0, 0) AS hll_union_agg(sketch, false)#x]
++- SubqueryAlias __auto_generated_subquery_name
+ +- Union false, false
+ :- Aggregate [hll_sketch_agg(col#x, 12, 0, 0) AS sketch#x]
+ : +- SubqueryAlias tab
+ : +- LocalRelation [col#x]
+ +- Aggregate [hll_sketch_agg(col#x, 20, 0, 0) AS sketch#x]
+ +- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_union(1, 2)
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"1\"",
+ "inputType" : "\"INT\"",
+ "paramIndex" : "1",
+ "requiredType" : "\"BINARY\"",
+ "sqlExpr" : "\"hll_union(1, 2, false)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 22,
+ "fragment" : "hll_union(1, 2)"
+ } ]
+}
+
+
+-- !query
+SELECT hll_sketch_estimate(CAST ('abc' AS BINARY))
+-- !query analysis
+Project [hll_sketch_estimate(cast(abc as binary)) AS hll_sketch_estimate(CAST(abc AS BINARY))#xL]
++- OneRowRelation
+
+
+-- !query
+SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY))
+-- !query analysis
+Project [hll_union(cast(abc as binary), cast(def as binary), false) AS hll_union(CAST(abc AS BINARY), CAST(def AS BINARY), false)#x]
++- OneRowRelation
+
+
+-- !query
+SELECT hll_union_agg(buffer, false)
+FROM (SELECT CAST('abc' AS BINARY) AS buffer)
+-- !query analysis
+Aggregate [hll_union_agg(buffer#x, false, 0, 0) AS hll_union_agg(buffer, false)#x]
++- SubqueryAlias __auto_generated_subquery_name
+ +- Project [cast(abc as binary) AS buffer#x]
+ +- OneRowRelation
+
+
+-- !query
+DROP TABLE IF EXISTS t1
+-- !query analysis
+DropTable true, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t1
diff --git a/sql/core/src/test/resources/sql-tests/inputs/hll.sql b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
new file mode 100644
index 00000000000..a0c29cb25a5
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
@@ -0,0 +1,76 @@
+-- Positive test cases
+-- Create a table with some testing data.
+DROP TABLE IF EXISTS t1;
+CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col);
+
+SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1;
+
+SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
+FROM VALUES (50), (60), (60), (60), (75), (100) tab(col);
+
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col);
+
+SELECT hll_sketch_estimate(
+ hll_union(
+ hll_sketch_agg(col1),
+ hll_sketch_agg(col2)))
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2);
+
+SELECT hll_sketch_estimate(hll_union_agg(sketch, true))
+ FROM (SELECT hll_sketch_agg(col) as sketch
+ FROM VALUES (1) AS tab(col)
+ UNION ALL
+ SELECT hll_sketch_agg(col, 20) as sketch
+ FROM VALUES (1) AS tab(col));
+
+-- Negative test cases
+SELECT hll_sketch_agg(col)
+FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col);
+
+SELECT hll_sketch_agg(col, 2)
+FROM VALUES (50), (60), (60) tab(col);
+
+SELECT hll_sketch_agg(col, 40)
+FROM VALUES (50), (60), (60) tab(col);
+
+SELECT hll_union(
+ hll_sketch_agg(col1, 12),
+ hll_sketch_agg(col2, 13))
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2);
+
+SELECT hll_union_agg(sketch, false)
+FROM (SELECT hll_sketch_agg(col, 12) as sketch
+ FROM VALUES (1) AS tab(col)
+ UNION ALL
+ SELECT hll_sketch_agg(col, 20) as sketch
+ FROM VALUES (1) AS tab(col));
+
+SELECT hll_union(1, 2)
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2);
+
+-- The HLL functions receive invalid buffers as inputs.
+SELECT hll_sketch_estimate(CAST ('abc' AS BINARY));
+
+SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY));
+
+SELECT hll_union_agg(buffer, false)
+FROM (SELECT CAST('abc' AS BINARY) AS buffer);
+
+-- Clean up
+DROP TABLE IF EXISTS t1;
diff --git a/sql/core/src/test/resources/sql-tests/results/hll.sql.out b/sql/core/src/test/resources/sql-tests/results/hll.sql.out
new file mode 100644
index 00000000000..c8a2e9a2faf
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/hll.sql.out
@@ -0,0 +1,262 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+DROP TABLE IF EXISTS t1
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1
+-- !query schema
+struct<result:bigint>
+-- !query output
+5
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
+FROM VALUES (50), (60), (60), (60), (75), (100) tab(col)
+-- !query schema
+struct<hll_sketch_estimate(hll_sketch_agg(col, 12)):bigint>
+-- !query output
+4
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_sketch_agg(col))
+FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col)
+-- !query schema
+struct<hll_sketch_estimate(hll_sketch_agg(col, 12)):bigint>
+-- !query output
+3
+
+
+-- !query
+SELECT hll_sketch_estimate(
+ hll_union(
+ hll_sketch_agg(col1),
+ hll_sketch_agg(col2)))
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2)
+-- !query schema
+struct<hll_sketch_estimate(hll_union(hll_sketch_agg(col1, 12), hll_sketch_agg(col2, 12), false)):bigint>
+-- !query output
+6
+
+
+-- !query
+SELECT hll_sketch_estimate(hll_union_agg(sketch, true))
+ FROM (SELECT hll_sketch_agg(col) as sketch
+ FROM VALUES (1) AS tab(col)
+ UNION ALL
+ SELECT hll_sketch_agg(col, 20) as sketch
+ FROM VALUES (1) AS tab(col))
+-- !query schema
+struct<hll_sketch_estimate(hll_union_agg(sketch, true)):bigint>
+-- !query output
+1
+
+
+-- !query
+SELECT hll_sketch_agg(col)
+FROM VALUES (ARRAY(1, 2)), (ARRAY(3, 4)) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"col\"",
+ "inputType" : "\"ARRAY<INT>\"",
+ "paramIndex" : "1",
+ "requiredType" : "(\"INT\" or \"BIGINT\" or \"STRING\" or \"BINARY\")",
+ "sqlExpr" : "\"hll_sketch_agg(col, 12)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 26,
+ "fragment" : "hll_sketch_agg(col)"
+ } ]
+}
+
+
+-- !query
+SELECT hll_sketch_agg(col, 2)
+FROM VALUES (50), (60), (60) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_INVALID_LG_K",
+ "messageParameters" : {
+ "function" : "`hll_sketch_agg`",
+ "max" : "21",
+ "min" : "4",
+ "value" : "2"
+ }
+}
+
+
+-- !query
+SELECT hll_sketch_agg(col, 40)
+FROM VALUES (50), (60), (60) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_INVALID_LG_K",
+ "messageParameters" : {
+ "function" : "`hll_sketch_agg`",
+ "max" : "21",
+ "min" : "4",
+ "value" : "40"
+ }
+}
+
+
+-- !query
+SELECT hll_union(
+ hll_sketch_agg(col1, 12),
+ hll_sketch_agg(col2, 13))
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_UNION_DIFFERENT_LG_K",
+ "messageParameters" : {
+ "function" : "`hll_union`",
+ "left" : "12",
+ "right" : "13"
+ }
+}
+
+
+-- !query
+SELECT hll_union_agg(sketch, false)
+FROM (SELECT hll_sketch_agg(col, 12) as sketch
+ FROM VALUES (1) AS tab(col)
+ UNION ALL
+ SELECT hll_sketch_agg(col, 20) as sketch
+ FROM VALUES (1) AS tab(col))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_UNION_DIFFERENT_LG_K",
+ "messageParameters" : {
+ "function" : "`hll_union_agg`",
+ "left" : "12",
+ "right" : "20"
+ }
+}
+
+
+-- !query
+SELECT hll_union(1, 2)
+ FROM VALUES
+ (1, 4),
+ (1, 4),
+ (2, 5),
+ (2, 5),
+ (3, 6) AS tab(col1, col2)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"1\"",
+ "inputType" : "\"INT\"",
+ "paramIndex" : "1",
+ "requiredType" : "\"BINARY\"",
+ "sqlExpr" : "\"hll_union(1, 2, false)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 22,
+ "fragment" : "hll_union(1, 2)"
+ } ]
+}
+
+
+-- !query
+SELECT hll_sketch_estimate(CAST ('abc' AS BINARY))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER",
+ "messageParameters" : {
+ "function" : "`hll_sketch_estimate`"
+ }
+}
+
+
+-- !query
+SELECT hll_union(CAST ('abc' AS BINARY), CAST ('def' AS BINARY))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER",
+ "messageParameters" : {
+ "function" : "`hll_union`"
+ }
+}
+
+
+-- !query
+SELECT hll_union_agg(buffer, false)
+FROM (SELECT CAST('abc' AS BINARY) AS buffer)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_INVALID_INPUT_SKETCH_BUFFER",
+ "messageParameters" : {
+ "function" : "`hll_union_agg`"
+ }
+}
+
+
+-- !query
+DROP TABLE IF EXISTS t1
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 577d86c2d9a..b6e1f5af011 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -23,7 +23,7 @@ import scala.util.Random
import org.scalatest.matchers.must.Matchers.the
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
@@ -1855,149 +1855,211 @@ class DataFrameAggregateSuite extends QueryTest
df2.createOrReplaceTempView("df2")
// validate that the functions error out when lgConfigK < 4 or > 24
- val error0 = intercept[SparkException] {
- val res = df1.groupBy("id")
- .agg(
- hll_sketch_agg("value", 1).as("hllsketch")
- )
- checkAnswer(res, Nil)
- }
- assert(error0.toString contains "SketchesArgumentException")
-
- val error1 = intercept[SparkException] {
- val res = df1.groupBy("id")
- .agg(
- hll_sketch_agg("value", 25).as("hllsketch")
- )
- checkAnswer(res, Nil)
- }
- assert(error1.toString contains "SketchesArgumentException")
+ checkError(
+ exception = intercept[SparkException] {
+ val res = df1.groupBy("id")
+ .agg(
+ hll_sketch_agg("value", 1).as("hllsketch")
+ )
+ checkAnswer(res, Nil)
+ }.getCause.asInstanceOf[SparkThrowable],
+ errorClass = "HLL_INVALID_LG_K",
+ parameters = Map(
+ "function" -> "`hll_sketch_agg`",
+ "min" -> "4",
+ "max" -> "21",
+ "value" -> "1"
+ ))
+
+ checkError(
+ exception = intercept[SparkException] {
+ val res = df1.groupBy("id")
+ .agg(
+ hll_sketch_agg("value", 25).as("hllsketch")
+ )
+ checkAnswer(res, Nil)
+ }.getCause.asInstanceOf[SparkThrowable],
+ errorClass = "HLL_INVALID_LG_K",
+ parameters = Map(
+ "function" -> "`hll_sketch_agg`",
+ "min" -> "4",
+ "max" -> "21",
+ "value" -> "25"
+ ))
// validate that unions error out by default for different lgConfigK sketches
- val error2 = intercept[SparkException] {
- val i1 = df1.groupBy("id")
- .agg(
- hll_sketch_agg("value").as("hllsketch_left")
- )
- val i2 = df2.groupBy("id")
- .agg(
- hll_sketch_agg("value", 20).as("hllsketch_right")
- )
- val res = i1.join(i2).withColumn("union", hll_union("hllsketch_left", "hllsketch_right"))
- checkAnswer(res, Nil)
- }
- assert(error2.toString contains "UnsupportedOperationException")
-
- val error3 = intercept[SparkException] {
- val i1 = df1.groupBy("id")
- .agg(
- hll_sketch_agg("value").as("hllsketch")
- )
- val i2 = df2.groupBy("id")
- .agg(
- hll_sketch_agg("value", 20).as("hllsketch")
- )
- val res = i1.union(i2).groupBy("id")
- .agg(
- hll_union_agg("hllsketch")
- )
- checkAnswer(res, Nil)
- }
- assert(error3.toString contains "UnsupportedOperationException")
+ checkError(
+ exception = intercept[SparkException] {
+ val i1 = df1.groupBy("id")
+ .agg(
+ hll_sketch_agg("value").as("hllsketch_left")
+ )
+ val i2 = df2.groupBy("id")
+ .agg(
+ hll_sketch_agg("value", 20).as("hllsketch_right")
+ )
+ val res = i1.join(i2).withColumn("union", hll_union("hllsketch_left", "hllsketch_right"))
+ checkAnswer(res, Nil)
+ }.getCause.asInstanceOf[SparkThrowable],
+ errorClass = "HLL_UNION_DIFFERENT_LG_K",
+ parameters = Map(
+ "left" -> "12",
+ "right" -> "20",
+ "function" -> "`hll_union`"
+ ))
+
+ checkError(
+ exception = intercept[SparkException] {
+ val i1 = df1.groupBy("id")
+ .agg(
+ hll_sketch_agg("value").as("hllsketch")
+ )
+ val i2 = df2.groupBy("id")
+ .agg(
+ hll_sketch_agg("value", 20).as("hllsketch")
+ )
+ val res = i1.union(i2).groupBy("id")
+ .agg(
+ hll_union_agg("hllsketch")
+ )
+ checkAnswer(res, Nil)
+ }.getCause.asInstanceOf[SparkThrowable],
+ errorClass = "HLL_UNION_DIFFERENT_LG_K",
+ parameters = Map(
+ "left" -> "12",
+ "right" -> "20",
+ "function" -> "`hll_union_agg`"
+ ))
// validate that the functions error out when provided unexpected types
- val error4 = intercept[AnalysisException] {
- val res = sql(
- """
- |select
- | id,
- | hll_sketch_agg(value, 'text')
- |from
- | df1
- |group by 1
- |""".stripMargin)
- checkAnswer(res, Nil)
- }
- assert(error4.toString contains "UNEXPECTED_INPUT_TYPE")
-
- val error5 = intercept[AnalysisException] {
- val res = sql(
- """with sketch_cte as (
- |select
- | id,
- | hll_sketch_agg(value) as sketch
- |from
- | df1
- |group by 1
- |)
- |
- |select hll_union_agg(sketch, 'Hll_4') from sketch_cte
- |""".stripMargin)
- checkAnswer(res, Nil)
- }
- assert(error5.toString contains "UNEXPECTED_INPUT_TYPE")
+ checkError(
+ exception = intercept[AnalysisException] {
+ val res = sql(
+ """
+ |select
+ | id,
+ | hll_sketch_agg(value, 'text')
+ |from
+ | df1
+ |group by 1
+ |""".stripMargin)
+ checkAnswer(res, Nil)
+ },
+ errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ parameters = Map(
+ "sqlExpr" -> "\"hll_sketch_agg(value, text)\"",
+ "paramIndex" -> "2",
+ "inputSql" -> "\"text\"",
+ "inputType" -> "\"STRING\"",
+ "requiredType" -> "\"INT\""
+ ),
+ context = ExpectedContext(
+ fragment = "hll_sketch_agg(value, 'text')",
+ start = 14,
+ stop = 42))
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ val res = sql(
+ """with sketch_cte as (
+ |select
+ | id,
+ | hll_sketch_agg(value) as sketch
+ |from
+ | df1
+ |group by 1
+ |)
+ |
+ |select hll_union_agg(sketch, 'Hll_4') from sketch_cte
+ |""".stripMargin)
+ checkAnswer(res, Nil)
+ },
+ errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ parameters = Map(
+ "sqlExpr" -> "\"hll_union_agg(sketch, Hll_4)\"",
+ "paramIndex" -> "2",
+ "inputSql" -> "\"Hll_4\"",
+ "inputType" -> "\"STRING\"",
+ "requiredType" -> "\"BOOLEAN\""
+ ),
+ context = ExpectedContext(
+ fragment = "hll_union_agg(sketch, 'Hll_4')",
+ start = 97,
+ stop = 126))
// validate that unions error out by default for different lgConfigK sketches
- val error6 = intercept[SparkException] {
- val res = sql(
- """with cte1 as (
- |select
- | id,
- | hll_sketch_agg(value) as sketch
- |from
- | df1
- |group by 1
- |),
- |
- |cte2 as (
- |select
- | id,
- | hll_sketch_agg(value, 20) as sketch
- |from
- | df2
- |group by 1
- |)
- |
- |select
- | cte1.id,
- | hll_union(cte1.sketch, cte2.sketch) as sketch
- |from
- | cte1 join cte2 on cte1.id = cte2.id
- |""".stripMargin)
- checkAnswer(res, Nil)
- }
- assert(error6.toString contains "UnsupportedOperationException")
-
- val error7 = intercept[SparkException] {
- val res = sql(
- """with cte1 as (
- |select
- | id,
- | hll_sketch_agg(value) as sketch
- |from
- | df1
- |group by 1
- |),
- |
- |cte2 as (
- |select
- | id,
- | hll_sketch_agg(value, 20) as sketch
- |from
- | df2
- |group by 1
- |)
- |
- |select
- | id,
- | hll_union_agg(sketch) as sketch
- |from
- | (select * from cte1 union all select * from cte2)
- |group by 1
- |""".stripMargin)
- checkAnswer(res, Nil)
- }
- assert(error7.toString contains "UnsupportedOperationException")
+ checkError(
+ exception = intercept[SparkException] {
+ val res = sql(
+ """with cte1 as (
+ |select
+ | id,
+ | hll_sketch_agg(value) as sketch
+ |from
+ | df1
+ |group by 1
+ |),
+ |
+ |cte2 as (
+ |select
+ | id,
+ | hll_sketch_agg(value, 20) as sketch
+ |from
+ | df2
+ |group by 1
+ |)
+ |
+ |select
+ | cte1.id,
+ | hll_union(cte1.sketch, cte2.sketch) as sketch
+ |from
+ | cte1 join cte2 on cte1.id = cte2.id
+ |""".stripMargin)
+ checkAnswer(res, Nil)
+ }.getCause.asInstanceOf[SparkThrowable],
+ errorClass = "HLL_UNION_DIFFERENT_LG_K",
+ parameters = Map(
+ "left" -> "12",
+ "right" -> "20",
+ "function" -> "`hll_union`"
+ ))
+
+ checkError(
+ exception = intercept[SparkException] {
+ val res = sql(
+ """with cte1 as (
+ |select
+ | id,
+ | hll_sketch_agg(value) as sketch
+ |from
+ | df1
+ |group by 1
+ |),
+ |
+ |cte2 as (
+ |select
+ | id,
+ | hll_sketch_agg(value, 20) as sketch
+ |from
+ | df2
+ |group by 1
+ |)
+ |
+ |select
+ | id,
+ | hll_union_agg(sketch) as sketch
+ |from
+ | (select * from cte1 union all select * from cte2)
+ |group by 1
+ |""".stripMargin)
+ checkAnswer(res, Nil)
+ }.getCause.asInstanceOf[SparkThrowable],
+ errorClass = "HLL_UNION_DIFFERENT_LG_K",
+ parameters = Map(
+ "left" -> "12",
+ "right" -> "20",
+ "function" -> "`hll_union_agg`"
+ ))
}
test("SPARK-43876: Enable fast hashmap for distinct queries") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org