You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "mkaravel (via GitHub)" <gi...@apache.org> on 2023/04/20 16:51:03 UTC

[GitHub] [spark] mkaravel commented on a diff in pull request #40615: [SPARK-16484][SQL] Add support for Datasketches HllSketch

mkaravel commented on code in PR #40615:
URL: https://github.com/apache/spark/pull/40615#discussion_r1172824899


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+
+import java.util.Locale
+
+import org.apache.datasketches.SketchesArgumentException
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BinaryType, DataType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
+ * probabilistically count the number of unique values in a given column, and
+ * outputs the binary representation of the HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgConfigK the log-base-2 of K, where K is the number of buckets or slots for the sketch
+ * @param tgtHllType the target type of the HllSketch to be used (HLL_4, HLL_6, HLL_8)
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgConfigK, tgtHllType) - Returns the HllSketch's compact binary representation.
+      `lgConfigK` (optional) the log-base-2 of K, with K = the number of buckets for the HllSketch.
+      `tgtHllType` (optional) the target type of the HllSketch (HLL_4, HLL_6, HLL_8). """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(col1, 12, 'HLL_4'))
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllSketchAgg(
+    child: Expression,
+    lgConfigKExpression: Expression,
+    tgtHllTypeExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[HllSketch] with TernaryLike[Expression] with ExpectsInputTypes {
+
+  // Hllsketch config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgConfigK: Int = second.eval().asInstanceOf[Int]
+  lazy val tgtHllType: TgtHllType = try {
+    TgtHllType.valueOf(third.eval().asInstanceOf[UTF8String].toString.toUpperCase(Locale.ROOT))
+  } catch {
+    case _: IllegalArgumentException =>
+      throw new SketchesArgumentException("Invalid tgtHllType value")
+  }
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression) = {
+    this(child, lgConfigK, Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int) = {
+    this(child, Literal(lgConfigK), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression, tgtHllType: Expression) = {
+    this(child, lgConfigK, tgtHllType, 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int, tgtHllType: String) = {
+    this(child, Literal(lgConfigK), Literal(tgtHllType), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): HllSketchAgg =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllSketchAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newFirst: Expression,
+                                              newSecond: Expression,
+                                              newThird: Expression): HllSketchAgg =
+    copy(child = newFirst, lgConfigKExpression = newSecond, tgtHllTypeExpression = newThird)
+
+  // Overrides for TernaryLike
+
+  override def first: Expression = child
+
+  override def second: Expression = lgConfigKExpression
+
+  override def third: Expression = tgtHllTypeExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_sketch_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, StringType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+    new HllSketch(lgConfigK, tgtHllType)
+  }
+
+  /**
+   * Evaluate the input row and update the HllSketch instance with the row's value.
+   * The update function only supports a subset of Spark SQL types, and an
+   * UnsupportedOperationException will be thrown for unsupported types.
+   *
+   * @param sketch The HllSketch instance.
+   * @param input  an input row
+   */
+  override def update(sketch: HllSketch, input: InternalRow): HllSketch = {
+    val v = first.eval(input)
+    if (v != null) {
+      first.dataType match {
+        // Update implemented for a subset of types supported by HllSketch
+        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
+        // Leaving out support for Array types, as unique counting these aren't a common use case
+        // Leaving out support for floating point types (IE DoubleType) due to imprecision
+        // TODO: implement support for decimal/datetime/interval types
+        case IntegerType => sketch.update(v.asInstanceOf[Int])
+        case LongType => sketch.update(v.asInstanceOf[Long])
+        case StringType => sketch.update(v.asInstanceOf[UTF8String].toString)
+        case BinaryType => sketch.update(v.asInstanceOf[Array[Byte]])
+        case dataType => throw new UnsupportedOperationException(
+          s"A HllSketch instance cannot be updates with a Spark ${dataType.toString} type")
+      }
+    }
+    sketch
+  }
+
+  /**
+   * Merges an input HllSketch into the sketch which is acting as the aggregation buffer.
+   *
+   * @param sketch the HllSketch instance used to store the aggregation result.
+   * @param input an input HllSketch instance
+   */
+  override def merge(sketch: HllSketch, input: HllSketch): HllSketch = {
+    val union = new Union(sketch.getLgConfigK)
+    union.update(sketch)
+    union.update(input)
+    union.getResult(sketch.getTgtHllType)
+  }
+
+  /**
+   * Returns an HllSketch derived from the input column or expression
+   *
+   * @param sketch HllSketch instance used as an aggregation buffer
+   * @return A binary sketch which can be evaluated or merged
+   */
+  override def eval(sketch: HllSketch): Any = {
+    sketch.toCompactByteArray
+  }
+
+  /** Convert the underlying HllSketch into an updatable byte array  */
+  override def serialize(sketch: HllSketch): Array[Byte] = {
+    sketch.toUpdatableByteArray
+  }
+
+  /** De-serializes the updatable byte array into a HllSketch instance */
+  override def deserialize(buffer: Array[Byte]): HllSketch = {
+    HllSketch.heapify(buffer)
+  }
+}
+
+/**
+ * The HllUnionAgg function ingests and merges Datasketches HllSketch
+ * instances previously produced by the HllSketchBinary function, and
+ * outputs the merged HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgMaxK The largest maximum size for lgConfigK for the union operation.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgMaxK) - Returns the estimated number of unique values.
+      `lgMaxK` (optional) The largest maximum size for lgConfigK for the union operation.""",
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(sketch, 12))
+      FROM (
+        SELECT hll_sketch_agg(col1) as sketch FROMVALUES (1), (1), (2), (2), (3) tab(col1)
+        UNION ALL
+        SELECT hll_sketch_agg(col1) as sketch FROMVALUES (4), (4), (5), (5), (6) tab(col1)
+      );
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllUnionAgg(
+    child: Expression,
+    lgMaxKExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[Union] with BinaryLike[Expression] with ExpectsInputTypes {
+
+  // Union config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgMaxK: Int = lgMaxKExpression.eval().asInstanceOf[Int]
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), 0, 0)
+  }
+
+  def this(child: Expression, lgMaxK: Expression) = {
+    this(child, lgMaxK, 0, 0)
+  }
+
+  def this(child: Expression, lgMaxK: Int) = {
+    this(child, Literal(lgMaxK), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int):
+  HllUnionAgg = copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllUnionAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression):
+  HllUnionAgg = copy(child = newLeft, lgMaxKExpression = newRight)
+
+  // Overrides for BinaryLike
+
+  override def left: Expression = child
+
+  override def right: Expression = lgMaxKExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_union_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, IntegerType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an Union instance using the lgMaxK param.
+   *
+   * @return an Union instance
+   */
+  override def createAggregationBuffer(): Union = {
+    new Union(lgMaxK)
+  }
+
+  /**
+   * Update the Union instance with the HllSketch byte array obtained from the row.
+   *
+   * @param union The Union instance.
+   * @param input an input row
+   */
+  override def update(union: Union, input: InternalRow): Union = {
+    val v = child.eval(input)
+    if (v != null) {
+      child.dataType match {
+        case BinaryType =>
+          union.update(HllSketch.wrap(Memory.wrap(v.asInstanceOf[Array[Byte]])))
+        case _ => throw new UnsupportedOperationException(
+          s"A Union instance can only be updated with a valid HllSketch byte array")

Review Comment:
   Do we need this?
   Since the aggregate expression is defined with `ExpectsInputTypes` it should never be the case that we have anything else other that a BINARY value.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+
+import java.util.Locale
+
+import org.apache.datasketches.SketchesArgumentException
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BinaryType, DataType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
+ * probabilistically count the number of unique values in a given column, and
+ * outputs the binary representation of the HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgConfigK the log-base-2 of K, where K is the number of buckets or slots for the sketch
+ * @param tgtHllType the target type of the HllSketch to be used (HLL_4, HLL_6, HLL_8)
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgConfigK, tgtHllType) - Returns the HllSketch's compact binary representation.
+      `lgConfigK` (optional) the log-base-2 of K, with K = the number of buckets for the HllSketch.
+      `tgtHllType` (optional) the target type of the HllSketch (HLL_4, HLL_6, HLL_8). """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(col1, 12, 'HLL_4'))
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllSketchAgg(
+    child: Expression,
+    lgConfigKExpression: Expression,
+    tgtHllTypeExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[HllSketch] with TernaryLike[Expression] with ExpectsInputTypes {
+
+  // Hllsketch config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgConfigK: Int = second.eval().asInstanceOf[Int]
+  lazy val tgtHllType: TgtHllType = try {
+    TgtHllType.valueOf(third.eval().asInstanceOf[UTF8String].toString.toUpperCase(Locale.ROOT))
+  } catch {
+    case _: IllegalArgumentException =>
+      throw new SketchesArgumentException("Invalid tgtHllType value")
+  }
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression) = {
+    this(child, lgConfigK, Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int) = {
+    this(child, Literal(lgConfigK), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression, tgtHllType: Expression) = {
+    this(child, lgConfigK, tgtHllType, 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int, tgtHllType: String) = {
+    this(child, Literal(lgConfigK), Literal(tgtHllType), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): HllSketchAgg =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllSketchAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newFirst: Expression,
+                                              newSecond: Expression,
+                                              newThird: Expression): HllSketchAgg =
+    copy(child = newFirst, lgConfigKExpression = newSecond, tgtHllTypeExpression = newThird)
+
+  // Overrides for TernaryLike
+
+  override def first: Expression = child
+
+  override def second: Expression = lgConfigKExpression
+
+  override def third: Expression = tgtHllTypeExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_sketch_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, StringType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+    new HllSketch(lgConfigK, tgtHllType)
+  }
+
+  /**
+   * Evaluate the input row and update the HllSketch instance with the row's value.
+   * The update function only supports a subset of Spark SQL types, and an
+   * UnsupportedOperationException will be thrown for unsupported types.
+   *
+   * @param sketch The HllSketch instance.
+   * @param input  an input row
+   */
+  override def update(sketch: HllSketch, input: InternalRow): HllSketch = {
+    val v = first.eval(input)
+    if (v != null) {
+      first.dataType match {
+        // Update implemented for a subset of types supported by HllSketch
+        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
+        // Leaving out support for Array types, as unique counting these aren't a common use case
+        // Leaving out support for floating point types (IE DoubleType) due to imprecision
+        // TODO: implement support for decimal/datetime/interval types
+        case IntegerType => sketch.update(v.asInstanceOf[Int])
+        case LongType => sketch.update(v.asInstanceOf[Long])
+        case StringType => sketch.update(v.asInstanceOf[UTF8String].toString)
+        case BinaryType => sketch.update(v.asInstanceOf[Array[Byte]])
+        case dataType => throw new UnsupportedOperationException(
+          s"A HllSketch instance cannot be updates with a Spark ${dataType.toString} type")
+      }
+    }
+    sketch
+  }
+
+  /**
+   * Merges an input HllSketch into the sketch which is acting as the aggregation buffer.
+   *
+   * @param sketch the HllSketch instance used to store the aggregation result.
+   * @param input an input HllSketch instance
+   */
+  override def merge(sketch: HllSketch, input: HllSketch): HllSketch = {
+    val union = new Union(sketch.getLgConfigK)
+    union.update(sketch)
+    union.update(input)
+    union.getResult(sketch.getTgtHllType)
+  }
+
+  /**
+   * Returns an HllSketch derived from the input column or expression
+   *
+   * @param sketch HllSketch instance used as an aggregation buffer
+   * @return A binary sketch which can be evaluated or merged
+   */
+  override def eval(sketch: HllSketch): Any = {
+    sketch.toCompactByteArray
+  }
+
+  /** Convert the underlying HllSketch into an updatable byte array  */
+  override def serialize(sketch: HllSketch): Array[Byte] = {
+    sketch.toUpdatableByteArray
+  }
+
+  /** De-serializes the updatable byte array into a HllSketch instance */
+  override def deserialize(buffer: Array[Byte]): HllSketch = {
+    HllSketch.heapify(buffer)
+  }
+}
+
+/**
+ * The HllUnionAgg function ingests and merges Datasketches HllSketch
+ * instances previously produced by the HllSketchBinary function, and
+ * outputs the merged HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgMaxK The largest maximum size for lgConfigK for the union operation.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgMaxK) - Returns the estimated number of unique values.
+      `lgMaxK` (optional) The largest maximum size for lgConfigK for the union operation.""",
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(sketch, 12))
+      FROM (
+        SELECT hll_sketch_agg(col1) as sketch FROMVALUES (1), (1), (2), (2), (3) tab(col1)
+        UNION ALL
+        SELECT hll_sketch_agg(col1) as sketch FROMVALUES (4), (4), (5), (5), (6) tab(col1)
+      );
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllUnionAgg(
+    child: Expression,
+    lgMaxKExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[Union] with BinaryLike[Expression] with ExpectsInputTypes {
+
+  // Union config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgMaxK: Int = lgMaxKExpression.eval().asInstanceOf[Int]
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), 0, 0)
+  }
+
+  def this(child: Expression, lgMaxK: Expression) = {
+    this(child, lgMaxK, 0, 0)
+  }
+
+  def this(child: Expression, lgMaxK: Int) = {
+    this(child, Literal(lgMaxK), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int):
+  HllUnionAgg = copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllUnionAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression):
+  HllUnionAgg = copy(child = newLeft, lgMaxKExpression = newRight)
+
+  // Overrides for BinaryLike
+
+  override def left: Expression = child
+
+  override def right: Expression = lgMaxKExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_union_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, IntegerType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an Union instance using the lgMaxK param.
+   *
+   * @return an Union instance
+   */
+  override def createAggregationBuffer(): Union = {
+    new Union(lgMaxK)
+  }
+
+  /**
+   * Update the Union instance with the HllSketch byte array obtained from the row.
+   *
+   * @param union The Union instance.
+   * @param input an input row
+   */
+  override def update(union: Union, input: InternalRow): Union = {
+    val v = child.eval(input)
+    if (v != null) {
+      child.dataType match {
+        case BinaryType =>
+          union.update(HllSketch.wrap(Memory.wrap(v.asInstanceOf[Array[Byte]])))
+        case _ => throw new UnsupportedOperationException(
+          s"A Union instance can only be updated with a valid HllSketch byte array")
+      }
+    }
+    union
+  }
+
+  /**
+   * Merges an input Union into the union which is acting as the aggregation buffer.
+   *
+   * @param union the Union instance used to store the aggregation result.
+   * @param input an input Union instance
+   */
+  override def merge(union: Union, input: Union): Union = {
+    union.update(input.getResult)
+    union
+  }
+
+  /**
+   * Returns an HllSketch derived from the merged HllSketches
+   *
+   * @param union Union instance used as an aggregation buffer
+   * @return A binary sketch which can be evaluated or merged
+   */
+  override def eval(union: Union): Any = {
+    union.toCompactByteArray
+  }

Review Comment:
   Same question here: why compact representation?



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -539,6 +539,235 @@ object functions {
   def grouping_id(colName: String, colNames: String*): Column =
     grouping_id((Seq(colName) ++ colNames).map(n => Column(n)): _*)
 
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch
+   * configured with lgConfigK and tgtHllType args.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_sketch_agg(e: Column, lgConfigK: Int, tgtHllType: String): Column =
+    Column.fn("hll_sketch_agg", e, lit(lgConfigK), lit(tgtHllType))
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch
+   * configured with lgConfigK and tgtHllType args.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_sketch_agg(columnName: String, lgConfigK: Int, tgtHllType: String): Column =
+    Column.fn("hll_sketch_agg", Column(columnName), lit(lgConfigK), lit(tgtHllType))
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch
+   * configured with lgConfigK arg and default tgtHllType value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_sketch_agg(e: Column, lgConfigK: Int): Column =
+    Column.fn("hll_sketch_agg", e, lit(lgConfigK))
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch
+   * configured with lgConfigK arg and default tgtHllType value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_sketch_agg(columnName: String, lgConfigK: Int): Column =
+    Column.fn("hll_sketch_agg", Column(columnName), lit(lgConfigK))
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch
+   * configured with default lgConfigK and tgtHllType values.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_sketch_agg(e: Column): Column =
+    Column.fn("hll_sketch_agg", e)
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch
+   * configured with default lgConfigK and tgtHllType values.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_sketch_agg(columnName: String): Column =
+    Column.fn("hll_sketch_agg", Column(columnName))
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch,
+   * generated by merging previously created Datasketches HllSketch instances via a Datasketches
+   * Union instance configured with lgMaxK arg.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_union_agg(e: Column, lgMaxK: Int): Column =
+    Column.fn("hll_union_agg", e, lit(lgMaxK))
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch,
+   * generated by merging previously created Datasketches HllSketch instances via a Datasketches
+   * Union instance configured with lgMaxK arg.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_union_agg(columnName: String, lgMaxK: Int): Column =
+    Column.fn("hll_union_agg", Column(columnName), lit(lgMaxK))
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch,
+   * generated by merging previously created Datasketches HllSketch instances via a Datasketches
+   * Union instance configured with default lgMaxK arg.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_union_agg(e: Column): Column =
+    Column.fn("hll_union_agg", e)
+
+  /**
+   * Aggregate function: returns the compact binary representation of the Datasketches HllSketch,
+   * generated by merging previously created Datasketches HllSketch instances via a Datasketches
+   * Union instance configured with default lgMaxK arg.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hll_union_agg(columnName: String): Column =
+    Column.fn("hll_union_agg", Column(columnName))
+
+  /**
+   * Aggregate function: returns the estimated number of unique items in a group, using a
+   * Datasketches HllSketch instance configured with lgConfigK and tgtHllType args.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def hllsketch_estimate(columnName: String, lgConfigK: Int, tgtHllType: String): Column = {
+    Column.fn("hllsketch_estimate", Column(columnName), lit(lgConfigK), lit(tgtHllType))
+  }

Review Comment:
   Here and below (aggregate section): do we still have these functions? I believe they have changed, including their names.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+
+import java.util.Locale
+
+import org.apache.datasketches.SketchesArgumentException
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BinaryType, DataType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
+ * probabilistically count the number of unique values in a given column, and
+ * outputs the binary representation of the HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgConfigK the log-base-2 of K, where K is the number of buckets or slots for the sketch
+ * @param tgtHllType the target type of the HllSketch to be used (HLL_4, HLL_6, HLL_8)
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgConfigK, tgtHllType) - Returns the HllSketch's compact binary representation.
+      `lgConfigK` (optional) the log-base-2 of K, with K = the number of buckets for the HllSketch.
+      `tgtHllType` (optional) the target type of the HllSketch (HLL_4, HLL_6, HLL_8). """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(col1, 12, 'HLL_4'))
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllSketchAgg(
+    child: Expression,
+    lgConfigKExpression: Expression,
+    tgtHllTypeExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[HllSketch] with TernaryLike[Expression] with ExpectsInputTypes {
+
+  // Hllsketch config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgConfigK: Int = second.eval().asInstanceOf[Int]
+  lazy val tgtHllType: TgtHllType = try {
+    TgtHllType.valueOf(third.eval().asInstanceOf[UTF8String].toString.toUpperCase(Locale.ROOT))
+  } catch {
+    case _: IllegalArgumentException =>
+      throw new SketchesArgumentException("Invalid tgtHllType value")
+  }
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression) = {
+    this(child, lgConfigK, Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int) = {
+    this(child, Literal(lgConfigK), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression, tgtHllType: Expression) = {
+    this(child, lgConfigK, tgtHllType, 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int, tgtHllType: String) = {
+    this(child, Literal(lgConfigK), Literal(tgtHllType), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): HllSketchAgg =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllSketchAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newFirst: Expression,
+                                              newSecond: Expression,
+                                              newThird: Expression): HllSketchAgg =
+    copy(child = newFirst, lgConfigKExpression = newSecond, tgtHllTypeExpression = newThird)
+
+  // Overrides for TernaryLike
+
+  override def first: Expression = child
+
+  override def second: Expression = lgConfigKExpression
+
+  override def third: Expression = tgtHllTypeExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_sketch_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, StringType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+    new HllSketch(lgConfigK, tgtHllType)
+  }
+
+  /**
+   * Evaluate the input row and update the HllSketch instance with the row's value.
+   * The update function only supports a subset of Spark SQL types, and an
+   * UnsupportedOperationException will be thrown for unsupported types.
+   *
+   * @param sketch The HllSketch instance.
+   * @param input  an input row
+   */
+  override def update(sketch: HllSketch, input: InternalRow): HllSketch = {
+    val v = first.eval(input)
+    if (v != null) {
+      first.dataType match {
+        // Update implemented for a subset of types supported by HllSketch
+        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
+        // Leaving out support for Array types, as unique counting these aren't a common use case
+        // Leaving out support for floating point types (IE DoubleType) due to imprecision
+        // TODO: implement support for decimal/datetime/interval types
+        case IntegerType => sketch.update(v.asInstanceOf[Int])
+        case LongType => sketch.update(v.asInstanceOf[Long])
+        case StringType => sketch.update(v.asInstanceOf[UTF8String].toString)
+        case BinaryType => sketch.update(v.asInstanceOf[Array[Byte]])
+        case dataType => throw new UnsupportedOperationException(
+          s"A HllSketch instance cannot be updates with a Spark ${dataType.toString} type")
+      }
+    }
+    sketch
+  }
+
+  /**
+   * Merges an input HllSketch into the sketch which is acting as the aggregation buffer.
+   *
+   * @param sketch the HllSketch instance used to store the aggregation result.
+   * @param input an input HllSketch instance
+   */
+  override def merge(sketch: HllSketch, input: HllSketch): HllSketch = {
+    val union = new Union(sketch.getLgConfigK)
+    union.update(sketch)
+    union.update(input)
+    union.getResult(sketch.getTgtHllType)
+  }
+
+  /**
+   * Returns an HllSketch derived from the input column or expression
+   *
+   * @param sketch HllSketch instance used as an aggregation buffer
+   * @return A binary sketch which can be evaluated or merged
+   */
+  override def eval(sketch: HllSketch): Any = {
+    sketch.toCompactByteArray
+  }
+
+  /** Convert the underlying HllSketch into an updatable byte array  */
+  override def serialize(sketch: HllSketch): Array[Byte] = {
+    sketch.toUpdatableByteArray
+  }
+
+  /** De-serializes the updatable byte array into a HllSketch instance */
+  override def deserialize(buffer: Array[Byte]): HllSketch = {
+    HllSketch.heapify(buffer)
+  }
+}
+
+/**
+ * The HllUnionAgg function ingests and merges Datasketches HllSketch
+ * instances previously produced by the HllSketchBinary function, and
+ * outputs the merged HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgMaxK The largest maximum size for lgConfigK for the union operation.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgMaxK) - Returns the estimated number of unique values.
+      `lgMaxK` (optional) The largest maximum size for lgConfigK for the union operation.""",
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(sketch, 12))
+      FROM (
+        SELECT hll_sketch_agg(col1) as sketch FROMVALUES (1), (1), (2), (2), (3) tab(col1)
+        UNION ALL
+        SELECT hll_sketch_agg(col1) as sketch FROMVALUES (4), (4), (5), (5), (6) tab(col1)
+      );
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllUnionAgg(
+    child: Expression,
+    lgMaxKExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)

Review Comment:
   The union operation does not take a register size as input (like the sketch creation aggregate function). In this case the DataSketches default choice will be used (4 bits per register). I think this could be problematic in terms of user expectations. To be honest, I think that exposing the register size in the first place in the sketch creation aggregate function is too much detail, and I would always use 8 bits per register as this is the fastest choice (opting for performance vs memory consumption here).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.datasketches.hll.{HllSketch, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, LongType}
+
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr) - Returns the estimated number of unique values given the binary representation
+    of a Datasketches HllSketch. """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(hll_sketch_agg(col1)
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "misc_funcs",
+  since = "3.5.0")
+case class HllSketchEstimate(child: Expression)
+  extends UnaryExpression
+    with CodegenFallback

Review Comment:
   Why no codegen? It should be very easy to implement.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+
+import java.util.Locale
+
+import org.apache.datasketches.SketchesArgumentException
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BinaryType, DataType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
+ * probabilistically count the number of unique values in a given column, and
+ * outputs the binary representation of the HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgConfigK the log-base-2 of K, where K is the number of buckets or slots for the sketch
+ * @param tgtHllType the target type of the HllSketch to be used (HLL_4, HLL_6, HLL_8)
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgConfigK, tgtHllType) - Returns the HllSketch's compact binary representation.
+      `lgConfigK` (optional) the log-base-2 of K, with K = the number of buckets for the HllSketch.
+      `tgtHllType` (optional) the target type of the HllSketch (HLL_4, HLL_6, HLL_8). """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(col1, 12, 'HLL_4'))
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllSketchAgg(
+    child: Expression,
+    lgConfigKExpression: Expression,
+    tgtHllTypeExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[HllSketch] with TernaryLike[Expression] with ExpectsInputTypes {
+
+  // Hllsketch config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgConfigK: Int = second.eval().asInstanceOf[Int]
+  lazy val tgtHllType: TgtHllType = try {
+    TgtHllType.valueOf(third.eval().asInstanceOf[UTF8String].toString.toUpperCase(Locale.ROOT))
+  } catch {
+    case _: IllegalArgumentException =>
+      throw new SketchesArgumentException("Invalid tgtHllType value")
+  }
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression) = {
+    this(child, lgConfigK, Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int) = {
+    this(child, Literal(lgConfigK), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression, tgtHllType: Expression) = {
+    this(child, lgConfigK, tgtHllType, 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int, tgtHllType: String) = {
+    this(child, Literal(lgConfigK), Literal(tgtHllType), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): HllSketchAgg =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllSketchAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newFirst: Expression,
+                                              newSecond: Expression,
+                                              newThird: Expression): HllSketchAgg =
+    copy(child = newFirst, lgConfigKExpression = newSecond, tgtHllTypeExpression = newThird)
+
+  // Overrides for TernaryLike
+
+  override def first: Expression = child
+
+  override def second: Expression = lgConfigKExpression
+
+  override def third: Expression = tgtHllTypeExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_sketch_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, StringType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+    new HllSketch(lgConfigK, tgtHllType)
+  }
+
+  /**
+   * Evaluate the input row and update the HllSketch instance with the row's value.
+   * The update function only supports a subset of Spark SQL types, and an
+   * UnsupportedOperationException will be thrown for unsupported types.
+   *
+   * @param sketch The HllSketch instance.
+   * @param input  an input row
+   */
+  override def update(sketch: HllSketch, input: InternalRow): HllSketch = {
+    val v = first.eval(input)
+    if (v != null) {
+      first.dataType match {
+        // Update implemented for a subset of types supported by HllSketch
+        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
+        // Leaving out support for Array types, as unique counting these aren't a common use case
+        // Leaving out support for floating point types (IE DoubleType) due to imprecision
+        // TODO: implement support for decimal/datetime/interval types
+        case IntegerType => sketch.update(v.asInstanceOf[Int])
+        case LongType => sketch.update(v.asInstanceOf[Long])
+        case StringType => sketch.update(v.asInstanceOf[UTF8String].toString)
+        case BinaryType => sketch.update(v.asInstanceOf[Array[Byte]])
+        case dataType => throw new UnsupportedOperationException(
+          s"A HllSketch instance cannot be updates with a Spark ${dataType.toString} type")

Review Comment:
   Do we really need this (see my other comment). Shouldn't we just list the allowed input data types and stop there?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+
+import java.util.Locale
+
+import org.apache.datasketches.SketchesArgumentException
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BinaryType, DataType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
+ * probabilistically count the number of unique values in a given column, and
+ * outputs the binary representation of the HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgConfigK the log-base-2 of K, where K is the number of buckets or slots for the sketch
+ * @param tgtHllType the target type of the HllSketch to be used (HLL_4, HLL_6, HLL_8)
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgConfigK, tgtHllType) - Returns the HllSketch's compact binary representation.
+      `lgConfigK` (optional) the log-base-2 of K, with K = the number of buckets for the HllSketch.
+      `tgtHllType` (optional) the target type of the HllSketch (HLL_4, HLL_6, HLL_8). """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(col1, 12, 'HLL_4'))
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllSketchAgg(
+    child: Expression,
+    lgConfigKExpression: Expression,
+    tgtHllTypeExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[HllSketch] with TernaryLike[Expression] with ExpectsInputTypes {
+
+  // Hllsketch config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgConfigK: Int = second.eval().asInstanceOf[Int]
+  lazy val tgtHllType: TgtHllType = try {
+    TgtHllType.valueOf(third.eval().asInstanceOf[UTF8String].toString.toUpperCase(Locale.ROOT))
+  } catch {
+    case _: IllegalArgumentException =>
+      throw new SketchesArgumentException("Invalid tgtHllType value")
+  }
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression) = {
+    this(child, lgConfigK, Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int) = {
+    this(child, Literal(lgConfigK), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression, tgtHllType: Expression) = {
+    this(child, lgConfigK, tgtHllType, 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int, tgtHllType: String) = {
+    this(child, Literal(lgConfigK), Literal(tgtHllType), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): HllSketchAgg =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllSketchAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newFirst: Expression,
+                                              newSecond: Expression,
+                                              newThird: Expression): HllSketchAgg =
+    copy(child = newFirst, lgConfigKExpression = newSecond, tgtHllTypeExpression = newThird)
+
+  // Overrides for TernaryLike
+
+  override def first: Expression = child
+
+  override def second: Expression = lgConfigKExpression
+
+  override def third: Expression = tgtHllTypeExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_sketch_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, StringType)

Review Comment:
   The aggregate expression is defined with `ExpectsInputTypes`. Why do we add `AnyDataType` here and then check the type, instead of explicitly defining the allowed data types here?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+
+import java.util.Locale
+
+import org.apache.datasketches.SketchesArgumentException
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BinaryType, DataType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
+ * probabilistically count the number of unique values in a given column, and
+ * outputs the binary representation of the HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgConfigK the log-base-2 of K, where K is the number of buckets or slots for the sketch
+ * @param tgtHllType the target type of the HllSketch to be used (HLL_4, HLL_6, HLL_8)
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgConfigK, tgtHllType) - Returns the HllSketch's compact binary representation.
+      `lgConfigK` (optional) the log-base-2 of K, with K = the number of buckets for the HllSketch.
+      `tgtHllType` (optional) the target type of the HllSketch (HLL_4, HLL_6, HLL_8). """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(col1, 12, 'HLL_4'))
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllSketchAgg(
+    child: Expression,
+    lgConfigKExpression: Expression,
+    tgtHllTypeExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[HllSketch] with TernaryLike[Expression] with ExpectsInputTypes {
+
+  // Hllsketch config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgConfigK: Int = second.eval().asInstanceOf[Int]
+  lazy val tgtHllType: TgtHllType = try {
+    TgtHllType.valueOf(third.eval().asInstanceOf[UTF8String].toString.toUpperCase(Locale.ROOT))
+  } catch {
+    case _: IllegalArgumentException =>
+      throw new SketchesArgumentException("Invalid tgtHllType value")
+  }
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression) = {
+    this(child, lgConfigK, Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int) = {
+    this(child, Literal(lgConfigK), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression, tgtHllType: Expression) = {
+    this(child, lgConfigK, tgtHllType, 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int, tgtHllType: String) = {
+    this(child, Literal(lgConfigK), Literal(tgtHllType), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): HllSketchAgg =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllSketchAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newFirst: Expression,
+                                              newSecond: Expression,
+                                              newThird: Expression): HllSketchAgg =
+    copy(child = newFirst, lgConfigKExpression = newSecond, tgtHllTypeExpression = newThird)
+
+  // Overrides for TernaryLike
+
+  override def first: Expression = child
+
+  override def second: Expression = lgConfigKExpression
+
+  override def third: Expression = tgtHllTypeExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_sketch_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, StringType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+    new HllSketch(lgConfigK, tgtHllType)
+  }
+
+  /**
+   * Evaluate the input row and update the HllSketch instance with the row's value.
+   * The update function only supports a subset of Spark SQL types, and an
+   * UnsupportedOperationException will be thrown for unsupported types.
+   *
+   * @param sketch The HllSketch instance.
+   * @param input  an input row
+   */
+  override def update(sketch: HllSketch, input: InternalRow): HllSketch = {
+    val v = first.eval(input)
+    if (v != null) {
+      first.dataType match {
+        // Update implemented for a subset of types supported by HllSketch
+        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
+        // Leaving out support for Array types, as unique counting these aren't a common use case
+        // Leaving out support for floating point types (IE DoubleType) due to imprecision
+        // TODO: implement support for decimal/datetime/interval types
+        case IntegerType => sketch.update(v.asInstanceOf[Int])
+        case LongType => sketch.update(v.asInstanceOf[Long])
+        case StringType => sketch.update(v.asInstanceOf[UTF8String].toString)
+        case BinaryType => sketch.update(v.asInstanceOf[Array[Byte]])
+        case dataType => throw new UnsupportedOperationException(
+          s"A HllSketch instance cannot be updates with a Spark ${dataType.toString} type")
+      }
+    }
+    sketch
+  }
+
+  /**
+   * Merges an input HllSketch into the sketch which is acting as the aggregation buffer.
+   *
+   * @param sketch the HllSketch instance used to store the aggregation result.
+   * @param input an input HllSketch instance
+   */
+  override def merge(sketch: HllSketch, input: HllSketch): HllSketch = {
+    val union = new Union(sketch.getLgConfigK)
+    union.update(sketch)
+    union.update(input)
+    union.getResult(sketch.getTgtHllType)
+  }
+
+  /**
+   * Returns an HllSketch derived from the input column or expression
+   *
+   * @param sketch HllSketch instance used as an aggregation buffer
+   * @return A binary sketch which can be evaluated or merged
+   */
+  override def eval(sketch: HllSketch): Any = {
+    sketch.toCompactByteArray
+  }
+
+  /** Convert the underlying HllSketch into an updatable byte array  */
+  override def serialize(sketch: HllSketch): Array[Byte] = {
+    sketch.toUpdatableByteArray
+  }
+
+  /** De-serializes the updatable byte array into a HllSketch instance */
+  override def deserialize(buffer: Array[Byte]): HllSketch = {
+    HllSketch.heapify(buffer)
+  }
+}
+
+/**
+ * The HllUnionAgg function ingests and merges Datasketches HllSketch
+ * instances previously produced by the HllSketchBinary function, and
+ * outputs the merged HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgMaxK The largest maximum size for lgConfigK for the union operation.

Review Comment:
   I am a bit confused here: let's say that we have a column of sketches with lgk values from 9 to 12 and you pass a value that is greater than 9 (say 11). Then the resulting sketch will have a lgk 9. In my opinion hiding this under the hood can be problematic. In the least the expression description should explain what happens. Ideally, we should not merge sketches with different lgk values by default (default call to the function) and we should have a separate API that allows to force downsampling and loss of sketch precision.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.datasketches.hll.{HllSketch, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, LongType}
+
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr) - Returns the estimated number of unique values given the binary representation
+    of a Datasketches HllSketch. """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(hll_sketch_agg(col1)
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "misc_funcs",
+  since = "3.5.0")
+case class HllSketchEstimate(child: Expression)
+  extends UnaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes
+    with NullIntolerant {
+
+  override protected def withNewChildInternal(newChild: Expression): HllSketchEstimate =
+    copy(child = newChild)
+
+  override def prettyName: String = "hll_sketch_estimate"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override def dataType: DataType = LongType
+
+  override def nullSafeEval(input: Any): Any = {
+    val buffer = input.asInstanceOf[Array[Byte]]
+    Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate)
+  }
+}
+
+@ExpressionDescription(
+  usage = """
+    _FUNC_(left, right) - Merges two binary representations of Datasketches HllSketch objects,
+    using a Datasketches Union object configured with an lgMaxK equal to the min of the
+    HllSketch object's lgConfigK values """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(hll_sketch_agg(col1), hll_sketch_agg(col1)))
+      FROM VALUES (1, 4), (1, 4), (2, 5), (2, 5), (3, 6) tab(col1, col2);
+       6
+  """,
+  group = "misc_funcs",
+  since = "3.5.0")
+case class HllUnion(left: Expression, right: Expression)
+  extends BinaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes
+    with NullIntolerant {
+
+  override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression):
+  HllUnion = copy(left = newLeft, right = newRight)
+
+  override def prettyName: String = "hll_union"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, BinaryType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullSafeEval(value1: Any, value2: Any): Any = {
+    val sketch1 = HllSketch.heapify(Memory.wrap(value1.asInstanceOf[Array[Byte]]))
+    val sketch2 = HllSketch.heapify(Memory.wrap(value2.asInstanceOf[Array[Byte]]))
+    val union = new Union(Math.min(sketch1.getLgConfigK, sketch2.getLgConfigK))
+    union.update(sketch1)
+    union.update(sketch2)
+    union.getResult.toUpdatableByteArray

Review Comment:
   Here the merge operation is returned as an updatable sketch. The aggregate version returns a compact sketch.
   I think they should behave the same in terms of the kind of representation returned, and as I point out in my comment in aggregate expressions, I think it is best to use the updatable representation.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.datasketches.hll.{HllSketch, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, LongType}
+
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr) - Returns the estimated number of unique values given the binary representation
+    of a Datasketches HllSketch. """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(hll_sketch_agg(col1)
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "misc_funcs",
+  since = "3.5.0")
+case class HllSketchEstimate(child: Expression)
+  extends UnaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes
+    with NullIntolerant {
+
+  override protected def withNewChildInternal(newChild: Expression): HllSketchEstimate =
+    copy(child = newChild)
+
+  override def prettyName: String = "hll_sketch_estimate"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override def dataType: DataType = LongType
+
+  override def nullSafeEval(input: Any): Any = {
+    val buffer = input.asInstanceOf[Array[Byte]]
+    Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate)
+  }
+}
+
+@ExpressionDescription(
+  usage = """
+    _FUNC_(left, right) - Merges two binary representations of Datasketches HllSketch objects,
+    using a Datasketches Union object configured with an lgMaxK equal to the min of the
+    HllSketch object's lgConfigK values """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(hll_sketch_agg(col1), hll_sketch_agg(col1)))
+      FROM VALUES (1, 4), (1, 4), (2, 5), (2, 5), (3, 6) tab(col1, col2);
+       6
+  """,
+  group = "misc_funcs",
+  since = "3.5.0")
+case class HllUnion(left: Expression, right: Expression)
+  extends BinaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes
+    with NullIntolerant {
+
+  override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression):
+  HllUnion = copy(left = newLeft, right = newRight)
+
+  override def prettyName: String = "hll_union"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, BinaryType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullSafeEval(value1: Any, value2: Any): Any = {
+    val sketch1 = HllSketch.heapify(Memory.wrap(value1.asInstanceOf[Array[Byte]]))
+    val sketch2 = HllSketch.heapify(Memory.wrap(value2.asInstanceOf[Array[Byte]]))
+    val union = new Union(Math.min(sketch1.getLgConfigK, sketch2.getLgConfigK))

Review Comment:
   This points back to the same issue I pointed out for the aggregate functions.
   We downsample one of the sketches under the hood without the user knowing that. I think we should only allow merging when we have sketches with the same lgk value (if it is really important have a another API/overload for merging sketches with different lgk values).



##########
sql/core/src/test/resources/sql-functions/sql-expression-schema.md:
##########
@@ -422,4 +422,4 @@
 | org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()') | struct<xpath(<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>, a/b/text()):array<string>> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_long(<a><b>1</b><b>2</b></a>, sum(a/b)):bigint> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | SELECT xpath_short('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_short(<a><b>1</b><b>2</b></a>, sum(a/b)):smallint> |
-| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
+| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |

Review Comment:
   How come the two new scalar functions do not appear here?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala:
##########
@@ -507,6 +507,9 @@ object FunctionRegistry {
     expression[RegrSlope]("regr_slope"),
     expression[RegrIntercept]("regr_intercept"),
     expression[Mode]("mode"),
+    expression[HllSketchEstimate]("hllsketch_estimate"),

Review Comment:
   Why do we have this?



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/DatasketchesHllSketchSuite.scala:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import scala.collection.immutable.NumericRange
+import scala.util.Random
+
+import org.apache.datasketches.hll.HllSketch
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, HllSketchEstimate}
+import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+class DatasketchesHllSketchSuite extends SparkFunSuite {

Review Comment:
   I know that in the previous review round you mentioned that you just want to propagate the errors from the DataSketches library and do not "polish" them. However, I believe the norm now is that we return nice error messages that are actionable and within the new error framework.
   In that respect I think we should actually check the lgk input values for correctness and throw a proper error. Same for the register size (although I think it should go away from the API wherever it appears).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datasketchesExpressions.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.datasketches.hll.{HllSketch, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, LongType}
+
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr) - Returns the estimated number of unique values given the binary representation
+    of a Datasketches HllSketch. """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(hll_sketch_agg(col1)
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "misc_funcs",
+  since = "3.5.0")
+case class HllSketchEstimate(child: Expression)
+  extends UnaryExpression
+    with CodegenFallback
+    with ExpectsInputTypes
+    with NullIntolerant {
+
+  override protected def withNewChildInternal(newChild: Expression): HllSketchEstimate =
+    copy(child = newChild)
+
+  override def prettyName: String = "hll_sketch_estimate"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override def dataType: DataType = LongType
+
+  override def nullSafeEval(input: Any): Any = {
+    val buffer = input.asInstanceOf[Array[Byte]]
+    Math.round(HllSketch.heapify(Memory.wrap(buffer)).getEstimate)
+  }
+}
+
+@ExpressionDescription(
+  usage = """
+    _FUNC_(left, right) - Merges two binary representations of Datasketches HllSketch objects,
+    using a Datasketches Union object configured with an lgMaxK equal to the min of the
+    HllSketch object's lgConfigK values """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(hll_sketch_agg(col1), hll_sketch_agg(col1)))
+      FROM VALUES (1, 4), (1, 4), (2, 5), (2, 5), (3, 6) tab(col1, col2);
+       6
+  """,
+  group = "misc_funcs",
+  since = "3.5.0")
+case class HllUnion(left: Expression, right: Expression)
+  extends BinaryExpression
+    with CodegenFallback

Review Comment:
   Same here.



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -1545,6 +1545,203 @@ class DataFrameAggregateSuite extends QueryTest
     )
     checkAnswer(res, Row(Array(1), Array(1)))
   }
+
+  test("SPARK-16484: hll_*_agg + hll_union + hll_sketch_estimate positive tests") {

Review Comment:
   I believe you are not done with the tests yet, but please let me make an observation: all positive queries below involve a grouping aggregation. I believe we should also have tests with non-grouping aggregations.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+
+import java.util.Locale
+
+import org.apache.datasketches.SketchesArgumentException
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.Memory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BinaryType, DataType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
+ * probabilistically count the number of unique values in a given column, and
+ * outputs the binary representation of the HllSketch.
+ *
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more information
+ *
+ * @param child child expression against which unique counting will occur
+ * @param lgConfigK the log-base-2 of K, where K is the number of buckets or slots for the sketch
+ * @param tgtHllType the target type of the HllSketch to be used (HLL_4, HLL_6, HLL_8)
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr, lgConfigK, tgtHllType) - Returns the HllSketch's compact binary representation.
+      `lgConfigK` (optional) the log-base-2 of K, with K = the number of buckets for the HllSketch.
+      `tgtHllType` (optional) the target type of the HllSketch (HLL_4, HLL_6, HLL_8). """,
+  examples = """
+    Examples:
+      > SELECT hll_sketch_estimate(_FUNC_(col1, 12, 'HLL_4'))
+      FROM VALUES (1), (1), (2), (2), (3) tab(col1);
+       3
+  """,
+  group = "agg_funcs",
+  since = "3.5.0")
+case class HllSketchAgg(
+    child: Expression,
+    lgConfigKExpression: Expression,
+    tgtHllTypeExpression: Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends TypedImperativeAggregate[HllSketch] with TernaryLike[Expression] with ExpectsInputTypes {
+
+  // Hllsketch config - mark as lazy so that they're not evaluated during tree transformation.
+
+  lazy val lgConfigK: Int = second.eval().asInstanceOf[Int]
+  lazy val tgtHllType: TgtHllType = try {
+    TgtHllType.valueOf(third.eval().asInstanceOf[UTF8String].toString.toUpperCase(Locale.ROOT))
+  } catch {
+    case _: IllegalArgumentException =>
+      throw new SketchesArgumentException("Invalid tgtHllType value")
+  }
+
+  // Constructors
+
+  def this(child: Expression) = {
+    this(child, Literal(HllSketch.DEFAULT_LG_K), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression) = {
+    this(child, lgConfigK, Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int) = {
+    this(child, Literal(lgConfigK), Literal(HllSketch.DEFAULT_HLL_TYPE.toString), 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Expression, tgtHllType: Expression) = {
+    this(child, lgConfigK, tgtHllType, 0, 0)
+  }
+
+  def this(child: Expression, lgConfigK: Int, tgtHllType: String) = {
+    this(child, Literal(lgConfigK), Literal(tgtHllType), 0, 0)
+  }
+
+  // Copy constructors required by ImperativeAggregate
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): HllSketchAgg =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): HllSketchAgg =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def withNewChildrenInternal(newFirst: Expression,
+                                              newSecond: Expression,
+                                              newThird: Expression): HllSketchAgg =
+    copy(child = newFirst, lgConfigKExpression = newSecond, tgtHllTypeExpression = newThird)
+
+  // Overrides for TernaryLike
+
+  override def first: Expression = child
+
+  override def second: Expression = lgConfigKExpression
+
+  override def third: Expression = tgtHllTypeExpression
+
+  // Overrides for TypedImperativeAggregate
+
+  override def prettyName: String = "hll_sketch_agg"
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, StringType)
+
+  override def dataType: DataType = BinaryType
+
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+    new HllSketch(lgConfigK, tgtHllType)
+  }
+
+  /**
+   * Evaluate the input row and update the HllSketch instance with the row's value.
+   * The update function only supports a subset of Spark SQL types, and an
+   * UnsupportedOperationException will be thrown for unsupported types.
+   *
+   * @param sketch The HllSketch instance.
+   * @param input  an input row
+   */
+  override def update(sketch: HllSketch, input: InternalRow): HllSketch = {
+    val v = first.eval(input)
+    if (v != null) {
+      first.dataType match {
+        // Update implemented for a subset of types supported by HllSketch
+        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
+        // Leaving out support for Array types, as unique counting these aren't a common use case
+        // Leaving out support for floating point types (IE DoubleType) due to imprecision
+        // TODO: implement support for decimal/datetime/interval types
+        case IntegerType => sketch.update(v.asInstanceOf[Int])
+        case LongType => sketch.update(v.asInstanceOf[Long])
+        case StringType => sketch.update(v.asInstanceOf[UTF8String].toString)
+        case BinaryType => sketch.update(v.asInstanceOf[Array[Byte]])
+        case dataType => throw new UnsupportedOperationException(
+          s"A HllSketch instance cannot be updates with a Spark ${dataType.toString} type")
+      }
+    }
+    sketch
+  }
+
+  /**
+   * Merges an input HllSketch into the sketch which is acting as the aggregation buffer.
+   *
+   * @param sketch the HllSketch instance used to store the aggregation result.
+   * @param input an input HllSketch instance
+   */
+  override def merge(sketch: HllSketch, input: HllSketch): HllSketch = {
+    val union = new Union(sketch.getLgConfigK)
+    union.update(sketch)
+    union.update(input)
+    union.getResult(sketch.getTgtHllType)
+  }
+
+  /**
+   * Returns an HllSketch derived from the input column or expression
+   *
+   * @param sketch HllSketch instance used as an aggregation buffer
+   * @return A binary sketch which can be evaluated or merged
+   */
+  override def eval(sketch: HllSketch): Any = {
+    sketch.toCompactByteArray

Review Comment:
   What is the logic behind the choice to return a compact representation?
   Why not stick with the updatable one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org