You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "LuciferYang (via GitHub)" <gi...@apache.org> on 2023/03/09 11:41:25 UTC

[GitHub] [spark] LuciferYang opened a new pull request, #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

LuciferYang opened a new pull request, #40352:
URL: https://github.com/apache/spark/pull/40352

   ### What changes were proposed in this pull request?
   This is pr using `BloomFilterAggregate` to implement `bloomFilter` function for `DataFrameStatFunctions`. Since `BloomFilterAggregate` is an internal function, this pr add a function registration process for it in `SparkConnectService#newIsolatedSession`.
   
   
   
   ### Why are the changes needed?
   Add Spark connect jvm client api coverage.
    
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   
   - Add new test 
   - Manually check scala 2.13
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131458023


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,101 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, numBits, Double.NaN)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, numBits, Double.NaN)
+  }
+
+  private def buildBloomFilter(
+      col: Column,
+      expectedNumItems: Long,
+      numBits: Long,
+      fpp: Double): BloomFilter = {
+
+    def optimalNumOfBits(n: Long, p: Double): Long =

Review Comment:
   Can we move this to the server side?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160392526


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1154,6 +1155,91 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 5 =>

Review Comment:
   TBH I think we need to get rid of all this manual function resolution, and try to fold it back into the FunctionRegistry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang closed pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang closed pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`
URL: https://github.com/apache/spark/pull/40352


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160392895


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,91 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 5 =>
+        // [col, catalogString: String, expectedNumItems: Long, numBits: Long, fpp: Double]
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val dt = {
+          val ddl = children(1) match {
+            case StringLiteral(s) => s
+            case other =>
+              throw InvalidPlanInput(s"col dataType should be a literal string, but got $other")
+          }
+          DataType.fromDDL(ddl)
+        }
+        val col = dt match {
+          case IntegerType | ShortType | ByteType => Cast(children.head, LongType)
+          case LongType | StringType => children.head
+          case other =>
+            throw InvalidPlanInput(
+              s"Bloom filter only supports integral types, " +
+                s"and does not support type $other.")
+        }
+
+        val fpp = children(4) match {
+          case DoubleLiteral(d) => d
+          case _ =>
+            throw InvalidPlanInput("False positive must be double literal.")
+        }
+
+        if (fpp.isNaN) {
+          // Use expectedNumItems and numBits when `fpp.isNaN` if true.
+          // Check expectedNumItems > 0L
+          val expectedNumItemsExpr = children(2)
+          val expectedNumItems = expectedNumItemsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Expected insertions must be long literal.")
+          }
+          if (expectedNumItems <= 0L) {
+            throw InvalidPlanInput("Expected insertions must be positive.")
+          }
+          // Check numBits > 0L
+          val numBitsExpr = children(3)
+          val numBits = numBitsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Number of bits must be long literal.")
+          }
+          if (numBits <= 0L) {
+            throw InvalidPlanInput("Number of bits must be positive.")
+          }
+          // Create BloomFilterAggregate with expectedNumItemsExpr and numBitsExpr.
+          Some(
+            new BloomFilterAggregate(col, expectedNumItemsExpr, numBitsExpr)
+              .toAggregateExpression())
+
+        } else {
+          def optimalNumOfBits(n: Long, p: Double): Long =

Review Comment:
   Not public just private[spark].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40352:
URL: https://github.com/apache/spark/pull/40352#issuecomment-1671392299

   This pr has been rebased many times, should I submit a new one...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131459693


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,101 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, numBits, Double.NaN)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, numBits, Double.NaN)
+  }
+
+  private def buildBloomFilter(
+      col: Column,
+      expectedNumItems: Long,
+      numBits: Long,
+      fpp: Double): BloomFilter = {
+
+    def optimalNumOfBits(n: Long, p: Double): Long =
+      (-n * Math.log(p) / (Math.log(2) * Math.log(2))).toLong
+
+    val nBits = if (fpp.isNaN) {

Review Comment:
   Prefer to check the value itself instead of the other one... you will need to add an isNaN check to the else clause here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160631080


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,91 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 5 =>
+        // [col, catalogString: String, expectedNumItems: Long, numBits: Long, fpp: Double]
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val dt = {
+          val ddl = children(1) match {
+            case StringLiteral(s) => s
+            case other =>
+              throw InvalidPlanInput(s"col dataType should be a literal string, but got $other")
+          }
+          DataType.fromDDL(ddl)
+        }
+        val col = dt match {
+          case IntegerType | ShortType | ByteType => Cast(children.head, LongType)
+          case LongType | StringType => children.head
+          case other =>
+            throw InvalidPlanInput(
+              s"Bloom filter only supports integral types, " +
+                s"and does not support type $other.")
+        }
+
+        val fpp = children(4) match {
+          case DoubleLiteral(d) => d
+          case _ =>
+            throw InvalidPlanInput("False positive must be double literal.")
+        }
+
+        if (fpp.isNaN) {
+          // Use expectedNumItems and numBits when `fpp.isNaN` if true.
+          // Check expectedNumItems > 0L
+          val expectedNumItemsExpr = children(2)
+          val expectedNumItems = expectedNumItemsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Expected insertions must be long literal.")
+          }
+          if (expectedNumItems <= 0L) {
+            throw InvalidPlanInput("Expected insertions must be positive.")
+          }
+          // Check numBits > 0L
+          val numBitsExpr = children(3)
+          val numBits = numBitsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Number of bits must be long literal.")
+          }
+          if (numBits <= 0L) {
+            throw InvalidPlanInput("Number of bits must be positive.")
+          }
+          // Create BloomFilterAggregate with expectedNumItemsExpr and numBitsExpr.
+          Some(
+            new BloomFilterAggregate(col, expectedNumItemsExpr, numBitsExpr)
+              .toAggregateExpression())
+
+        } else {
+          def optimalNumOfBits(n: Long, p: Double): Long =

Review Comment:
   make this as Java `package` access scope and introduce `BloomFilterHelper` in connect-server module to make it can access in `spark ` package



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,91 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 5 =>
+        // [col, catalogString: String, expectedNumItems: Long, numBits: Long, fpp: Double]
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val dt = {
+          val ddl = children(1) match {
+            case StringLiteral(s) => s
+            case other =>
+              throw InvalidPlanInput(s"col dataType should be a literal string, but got $other")
+          }
+          DataType.fromDDL(ddl)
+        }
+        val col = dt match {
+          case IntegerType | ShortType | ByteType => Cast(children.head, LongType)
+          case LongType | StringType => children.head
+          case other =>
+            throw InvalidPlanInput(
+              s"Bloom filter only supports integral types, " +
+                s"and does not support type $other.")
+        }
+
+        val fpp = children(4) match {
+          case DoubleLiteral(d) => d
+          case _ =>
+            throw InvalidPlanInput("False positive must be double literal.")
+        }
+
+        if (fpp.isNaN) {
+          // Use expectedNumItems and numBits when `fpp.isNaN` if true.
+          // Check expectedNumItems > 0L
+          val expectedNumItemsExpr = children(2)
+          val expectedNumItems = expectedNumItemsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Expected insertions must be long literal.")
+          }
+          if (expectedNumItems <= 0L) {
+            throw InvalidPlanInput("Expected insertions must be positive.")
+          }
+          // Check numBits > 0L
+          val numBitsExpr = children(3)
+          val numBits = numBitsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Number of bits must be long literal.")
+          }
+          if (numBits <= 0L) {
+            throw InvalidPlanInput("Number of bits must be positive.")
+          }
+          // Create BloomFilterAggregate with expectedNumItemsExpr and numBitsExpr.
+          Some(
+            new BloomFilterAggregate(col, expectedNumItemsExpr, numBitsExpr)
+              .toAggregateExpression())
+
+        } else {
+          def optimalNumOfBits(n: Long, p: Double): Long =

Review Comment:
   change this to Java `package` access scope and introduce `BloomFilterHelper` in connect-server module to make it can access in `spark ` package



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160395012


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala:
##########
@@ -78,7 +79,7 @@ case class BloomFilterAggregate(
             "exprName" -> "estimatedNumItems or numBits"
           )
         )
-      case (LongType, LongType, LongType) =>
+      case (LongType, LongType, LongType) | (StringType, LongType, LongType) =>

Review Comment:
   Alternatively: `case (LongType | StringType, LongType, LongType) =>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1132193330


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,97 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, numBits, Double.NaN)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, numBits, Double.NaN)
+  }
+
+  private def buildBloomFilter(
+      col: Column,
+      expectedNumItems: Long,
+      numBits: Long,
+      fpp: Double): BloomFilter = {
+
+    val dataType = sparkSession

Review Comment:
   Add this due to:
   1. check `col` support type in server side
   2. Add Cast for IntegerType/ShortType/ByteType



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,97 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, numBits, Double.NaN)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, numBits, Double.NaN)
+  }
+
+  private def buildBloomFilter(
+      col: Column,
+      expectedNumItems: Long,
+      numBits: Long,
+      fpp: Double): BloomFilter = {
+
+    val dataType = sparkSession

Review Comment:
   Add this due to:
   1. check `col` support type in server side
   2. Add `Cast` for IntegerType/ShortType/ByteType



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1145683804


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,97 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.5.0

Review Comment:
   change `@since` from 3.4.0 to 3.5.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40352:
URL: https://github.com/apache/spark/pull/40352#issuecomment-1500147099

   GA failure is not related to the current PR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160566049


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala:
##########
@@ -78,7 +79,7 @@ case class BloomFilterAggregate(
             "exprName" -> "estimatedNumItems or numBits"
           )
         )
-      case (LongType, LongType, LongType) =>
+      case (LongType, LongType, LongType) | (StringType, LongType, LongType) =>

Review Comment:
   Thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160389178


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,97 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, numBits, Double.NaN)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, numBits, Double.NaN)
+  }
+
+  private def buildBloomFilter(
+      col: Column,
+      expectedNumItems: Long,
+      numBits: Long,
+      fpp: Double): BloomFilter = {
+
+    val dataType = sparkSession

Review Comment:
   Can we use the `TypeOf` expression for this instead? Alternatively we can try to figure this out in the planner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160563946


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1154,6 +1155,91 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 5 =>

Review Comment:
   `bloom_filter_agg ` is an internal function that does not allow users to directly use it, `fold it back into the FunctionRegistry.` may require some additional work
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131904105


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala:
##########
@@ -176,4 +176,31 @@ class DataFrameStatSuite extends RemoteSparkSession {
     assert(sketch.relativeError() === 0.001)
     assert(sketch.confidence() === 0.99 +- 5e-3)
   }
+
+  // This test only verifies some basic requirements, more correctness tests can be found in
+  // `BloomFilterSuite` in project spark-sketch.
+  test("Bloom filter") {

Review Comment:
   Let me add them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40352:
URL: https://github.com/apache/spark/pull/40352#issuecomment-1671422925

   I make a clean one https://github.com/apache/spark/pull/42414


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160620933


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,86 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.5.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.5.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.5.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, numBits, Double.NaN)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.5.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, numBits, Double.NaN)
+  }
+
+  private def buildBloomFilter(
+      col: Column,
+      expectedNumItems: Long,
+      numBits: Long,
+      fpp: Double): BloomFilter = {
+
+    val agg = if (!fpp.isNaN) {

Review Comment:
   Before [chanage to always pass 3 parameters](https://github.com/apache/spark/pull/40352/commits/5bd616954aaef21b88b161e20e0f49bc067cae0c), always pass all 4 parameters.
   
   Now change to pass (`col`, expectedNumItems, fpp) if `!fpp. isNaN `, otherwise pass (`col`, expectedNumItems, numBits).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131455328


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,12 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 3 =>
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        Some(
+          new BloomFilterAggregate(children.head, children(1), children(2))

Review Comment:
   There is a small issue here. The aggregate requires the first input to be a Long. `DataFrameStatFunctions.bloomFilter` supports `Byte`, `Short`, `Int`, `Long`, and `String`. While we can simply add a cast to long for the first 4, string will be an issue. We need adapt the BloomFilterAggregate to make it fully compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131904105


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala:
##########
@@ -176,4 +176,31 @@ class DataFrameStatSuite extends RemoteSparkSession {
     assert(sketch.relativeError() === 0.001)
     assert(sketch.confidence() === 0.99 +- 5e-3)
   }
+
+  // This test only verifies some basic requirements, more correctness tests can be found in
+  // `BloomFilterSuite` in project spark-sketch.
+  test("Bloom filter") {

Review Comment:
   Let me add that cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131460103


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala:
##########
@@ -176,4 +176,31 @@ class DataFrameStatSuite extends RemoteSparkSession {
     assert(sketch.relativeError() === 0.001)
     assert(sketch.confidence() === 0.99 +- 5e-3)
   }
+
+  // This test only verifies some basic requirements, more correctness tests can be found in
+  // `BloomFilterSuite` in project spark-sketch.
+  test("Bloom filter") {

Review Comment:
   Add tests for byte/short/int?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131897415


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,12 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 3 =>
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        Some(
+          new BloomFilterAggregate(children.head, children(1), children(2))

Review Comment:
   > There is a small issue here. The aggregate requires the first input to be a Long. `DataFrameStatFunctions.bloomFilter` supports `Byte`, `Short`, `Int`, `Long`, and `String`. While we can simply add a cast to long for the first 4, string will be an issue. We need adapt the BloomFilterAggregate to make it fully compatible.
   
   So maybe adding a new protobuf message is a more simpler way? `BloomFilterAggregate` is a internal function used by `InjectRuntimeFilter`,  and `InjectRuntimeFilter` hashes both input column and might contain values, so I think there is no String type input in that scenario?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160394607


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1154,6 +1155,91 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 5 =>
+        // [col, catalogString: String, expectedNumItems: Long, numBits: Long, fpp: Double]
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val dt = {
+          val ddl = children(1) match {
+            case StringLiteral(s) => s
+            case other =>
+              throw InvalidPlanInput(s"col dataType should be a literal string, but got $other")
+          }
+          DataType.fromDDL(ddl)
+        }
+        val col = dt match {

Review Comment:
   I wonder if we can solve this by special casing implicit type conversions in BloomFilterAggregate instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160557436


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,91 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 5 =>
+        // [col, catalogString: String, expectedNumItems: Long, numBits: Long, fpp: Double]
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val dt = {
+          val ddl = children(1) match {
+            case StringLiteral(s) => s
+            case other =>
+              throw InvalidPlanInput(s"col dataType should be a literal string, but got $other")
+          }
+          DataType.fromDDL(ddl)
+        }
+        val col = dt match {
+          case IntegerType | ShortType | ByteType => Cast(children.head, LongType)
+          case LongType | StringType => children.head
+          case other =>
+            throw InvalidPlanInput(
+              s"Bloom filter only supports integral types, " +
+                s"and does not support type $other.")
+        }
+
+        val fpp = children(4) match {
+          case DoubleLiteral(d) => d
+          case _ =>
+            throw InvalidPlanInput("False positive must be double literal.")
+        }
+
+        if (fpp.isNaN) {
+          // Use expectedNumItems and numBits when `fpp.isNaN` if true.
+          // Check expectedNumItems > 0L
+          val expectedNumItemsExpr = children(2)
+          val expectedNumItems = expectedNumItemsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Expected insertions must be long literal.")
+          }
+          if (expectedNumItems <= 0L) {
+            throw InvalidPlanInput("Expected insertions must be positive.")
+          }
+          // Check numBits > 0L
+          val numBitsExpr = children(3)
+          val numBits = numBitsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Number of bits must be long literal.")
+          }
+          if (numBits <= 0L) {
+            throw InvalidPlanInput("Number of bits must be positive.")
+          }
+          // Create BloomFilterAggregate with expectedNumItemsExpr and numBitsExpr.
+          Some(
+            new BloomFilterAggregate(col, expectedNumItemsExpr, numBitsExpr)
+              .toAggregateExpression())
+
+        } else {
+          def optimalNumOfBits(n: Long, p: Double): Long =

Review Comment:
   this is a java file, no `private[spark]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40352:
URL: https://github.com/apache/spark/pull/40352#issuecomment-1500146157

   In the last commit, make `BloomFilterAggregate` explicitly supported `IntegerType/ShortType/ByteType` and added corresponding updaters, then removed pass `dataType`  and `adding cast nodes` 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131893211


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,12 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 3 =>
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        Some(
+          new BloomFilterAggregate(children.head, children(1), children(2))

Review Comment:
   > You will need to hash the input column.
   
   like `new BloomFilterAggregate(new XxHash64(Seq(children.head)), ....`
   
   But if we hash the input column, I think it will be hashed twice ... the existing test case 
   
   ```
   assert(0.until(1000).forall(filter1.mightContain))
   ```
   
   will not pass



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1132197345


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,91 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 5 =>
+        // [col, catalogString: String, expectedNumItems: Long, numBits: Long, fpp: Double]
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val dt = {
+          val ddl = children(1) match {
+            case StringLiteral(s) => s
+            case other =>
+              throw InvalidPlanInput(s"col dataType should be a literal string, but got $other")
+          }
+          DataType.fromDDL(ddl)
+        }
+        val col = dt match {
+          case IntegerType | ShortType | ByteType => Cast(children.head, LongType)
+          case LongType | StringType => children.head
+          case other =>
+            throw InvalidPlanInput(
+              s"Bloom filter only supports integral types, " +
+                s"and does not support type $other.")
+        }
+
+        val fpp = children(4) match {
+          case DoubleLiteral(d) => d
+          case _ =>
+            throw InvalidPlanInput("False positive must be double literal.")
+        }
+
+        if (fpp.isNaN) {
+          // Use expectedNumItems and numBits when `fpp.isNaN` if true.
+          // Check expectedNumItems > 0L
+          val expectedNumItemsExpr = children(2)
+          val expectedNumItems = expectedNumItemsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Expected insertions must be long literal.")
+          }
+          if (expectedNumItems <= 0L) {
+            throw InvalidPlanInput("Expected insertions must be positive.")
+          }
+          // Check numBits > 0L
+          val numBitsExpr = children(3)
+          val numBits = numBitsExpr match {
+            case Literal(l: Long, LongType) => l
+            case _ =>
+              throw InvalidPlanInput("Number of bits must be long literal.")
+          }
+          if (numBits <= 0L) {
+            throw InvalidPlanInput("Number of bits must be positive.")
+          }
+          // Create BloomFilterAggregate with expectedNumItemsExpr and numBitsExpr.
+          Some(
+            new BloomFilterAggregate(col, expectedNumItemsExpr, numBitsExpr)
+              .toAggregateExpression())
+
+        } else {
+          def optimalNumOfBits(n: Long, p: Double): Long =

Review Comment:
   this method is copy of from `BloomFilter`
   
   https://github.com/apache/spark/blob/f8966e7eee1d7f2db7b376d557d5ff6658c80653/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java#L195-L206
   
   Can we make it public?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1132194697


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala:
##########
@@ -176,4 +176,31 @@ class DataFrameStatSuite extends RemoteSparkSession {
     assert(sketch.relativeError() === 0.001)
     assert(sketch.confidence() === 0.99 +- 5e-3)
   }
+
+  // This test only verifies some basic requirements, more correctness tests can be found in
+  // `BloomFilterSuite` in project spark-sketch.
+  test("Bloom filter") {

Review Comment:
   Added new test to test Long/Int/Short/Byte/String and invalids



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131019335


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -265,7 +268,23 @@ object SparkConnectService {
   }
 
   private def newIsolatedSession(): SparkSession = {
-    SparkSession.active.newSession()
+    val session = SparkSession.active.newSession()
+    registerBloomFilterFunction(session)
+    session
+  }
+
+  private def registerBloomFilterFunction(session: SparkSession): Unit = {

Review Comment:
   if register this here, the py test:
   
   ```
       def test_sql_with_command(self):
           # SPARK-42705: spark.sql should return values from the command.
           self.assertEqual(
               self.connect.sql("show functions").collect(), self.spark.sql("show functions").collect()
           )
   ```
   
   will failed, so we can't use `BloomFilterAggregate` in this way?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40352:
URL: https://github.com/apache/spark/pull/40352#issuecomment-1671385089

   @LuciferYang can we restart this effort.
   
   I promise I will look at the proto PR :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1132189985


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,12 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 3 =>
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        Some(
+          new BloomFilterAggregate(children.head, children(1), children(2))

Review Comment:
   Refactored `BloomFilterAggregate ` to support StringType
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131995398


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,101 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, numBits, Double.NaN)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.4.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, numBits, Double.NaN)
+  }
+
+  private def buildBloomFilter(
+      col: Column,
+      expectedNumItems: Long,
+      numBits: Long,
+      fpp: Double): BloomFilter = {
+
+    def optimalNumOfBits(n: Long, p: Double): Long =

Review Comment:
   Do you mean to pass all 3 parameters to the server side, and then do the `conversion(optimalNumOfBits)` on the server side?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131051613


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -265,7 +268,23 @@ object SparkConnectService {
   }
 
   private def newIsolatedSession(): SparkSession = {
-    SparkSession.active.newSession()
+    val session = SparkSession.active.newSession()
+    registerBloomFilterFunction(session)
+    session
+  }
+
+  private def registerBloomFilterFunction(session: SparkSession): Unit = {

Review Comment:
   will change a way 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131019335


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -265,7 +268,23 @@ object SparkConnectService {
   }
 
   private def newIsolatedSession(): SparkSession = {
-    SparkSession.active.newSession()
+    val session = SparkSession.active.newSession()
+    registerBloomFilterFunction(session)
+    session
+  }
+
+  private def registerBloomFilterFunction(session: SparkSession): Unit = {

Review Comment:
   if register this here, the py test:
   
   ```
       def test_sql_with_command(self):
           # SPARK-42705: spark.sql should return values from the command.
           self.assertEqual(
               self.connect.sql("show functions").collect(), self.spark.sql("show functions").collect()
           )
   ```
   
   will failed ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40352: [WIP][SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1131445842


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1073,6 +1074,12 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(Lead(children.head, children(1), children(2), ignoreNulls))
 
+      case "bloom_filter_agg" if fun.getArgumentsCount == 3 =>
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        Some(
+          new BloomFilterAggregate(children.head, children(1), children(2))

Review Comment:
   You will need to hash the input column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40352: [SPARK-42664][CONNECT] Support `bloomFilter` function for `DataFrameStatFunctions`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40352:
URL: https://github.com/apache/spark/pull/40352#discussion_r1160620933


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##########
@@ -584,6 +585,86 @@ final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, roo
     }
     CountMinSketch.readFrom(ds.head())
   }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.5.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param fpp
+   *   expected false positive probability of the filter.
+   * @since 3.5.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, fpp: Double): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, -1L, fpp)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param colName
+   *   name of the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.5.0
+   */
+  def bloomFilter(colName: String, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(Column(colName), expectedNumItems, numBits, Double.NaN)
+  }
+
+  /**
+   * Builds a Bloom filter over a specified column.
+   *
+   * @param col
+   *   the column over which the filter is built
+   * @param expectedNumItems
+   *   expected number of items which will be put into the filter.
+   * @param numBits
+   *   expected number of bits of the filter.
+   * @since 3.5.0
+   */
+  def bloomFilter(col: Column, expectedNumItems: Long, numBits: Long): BloomFilter = {
+    buildBloomFilter(col, expectedNumItems, numBits, Double.NaN)
+  }
+
+  private def buildBloomFilter(
+      col: Column,
+      expectedNumItems: Long,
+      numBits: Long,
+      fpp: Double): BloomFilter = {
+
+    val agg = if (!fpp.isNaN) {

Review Comment:
   Before [chanage to always pass 3 parameters](https://github.com/apache/spark/pull/40352/commits/5bd616954aaef21b88b161e20e0f49bc067cae0c), always pass all 4 parameters.
   
   Now change to pass (`col`, `expectedNumItems`, `fpp`) if `!fpp. isNaN `, otherwise pass (`col`, `expectedNumItems`, `numBits`).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org