You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/03/31 20:01:40 UTC

[GitHub] [spark] dtenedor commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch

dtenedor commented on code in PR #40615:
URL: https://github.com/apache/spark/pull/40615#discussion_r1154843262


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for TypedImperativeAggregate
+
+  /** Aggregate functions which utilize HllSketch instances should never return null */
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+    // scalastyle:off caselocale
+    new HllSketch(lgConfigK, TgtHllType.valueOf(tgtHllType.toUpperCase))
+    // scalastyle:on caselocale
+  }
+
+  /**
+   * Evaluate the input row and update the HllSketch instance with the row's value.
+   * The update function only supports a subset of Spark SQL types, and an
+   * UnsupportedOperationException will be thrown for unsupported types.
+   *
+   * @param sketch The HllSketch instance.
+   * @param input  an input row
+   */
+  override def update(sketch: HllSketch, input: InternalRow): HllSketch = {
+    val v = child.eval(input)
+    if (v != null) {
+      child.dataType match {
+        // Update implemented for all types supported by HllSketch
+        // Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out
+        case IntegerType => sketch.update(v.asInstanceOf[Int])
+        case LongType => sketch.update(v.asInstanceOf[Long])
+        case DoubleType => sketch.update(v.asInstanceOf[Double])

Review Comment:
   I don't think it is a good idea to support IEEE floating-point input types for these functions. There is imprecision when comparing them to each other which can lead to instability in the results. By the same token, performing a GROUP BY operation by floating-point values is not a good idea. For reference, see these function definitions which do not include floating-point input types [1].
   
   On the other hand, I don't see any reason why we can't support DecimalType, Datetime or Interval types (we can leave a TODO comment to support those in another PR if we want).
   
   [1] https://github.com/google/zetasql/blob/master/docs/hll_functions.md



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for TypedImperativeAggregate
+
+  /** Aggregate functions which utilize HllSketch instances should never return null */
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+    // scalastyle:off caselocale

Review Comment:
   should we just do `toUpperCase(Locale.Root)` instead of disabling this style check?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for TypedImperativeAggregate
+
+  /** Aggregate functions which utilize HllSketch instances should never return null */

Review Comment:
   what if the input is null or empty? Is they result simply zero?



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -1545,6 +1545,59 @@ class DataFrameAggregateSuite extends QueryTest
     )
     checkAnswer(res, Row(Array(1), Array(1)))
   }
+
+  test("SPARK-16484: hllsketch_estimate positive tests") {

Review Comment:
   we are going to need more test cases :) some ideas:
   
   * empty input table
   * use SQL to query the aggregate functions
   * explicit regular integers vs. long integers in the input table
   * minimum and maximum supported values for each data type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org