You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "peter-toth (via GitHub)" <gi...@apache.org> on 2023/08/31 16:21:30 UTC

[GitHub] [spark] peter-toth opened a new pull request, #42755: [SPARK-45034][SQL] Support deterministic mode function

peter-toth opened a new pull request, #42755:
URL: https://github.com/apache/spark/pull/42755

   ### What changes were proposed in this pull request?
   This PR adds a new argument to the `mode` aggregate function to provide deterministic results.
   
   ### Why are the changes needed?
   To make the function more user friendly.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, it adds a new argument to the `mode` function.
   
   ### How was this patch tested?
   Added new UTs.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on PR #42755:
URL: https://github.com/apache/spark/pull/42755#issuecomment-1722906229

   Thanks all for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1325576026


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -35,24 +42,60 @@ import org.apache.spark.util.collection.OpenHashMap
        0-10
       > SELECT _FUNC_(col) FROM VALUES (0), (10), (10), (null), (null), (null) AS tab(col);
        10
+      > SELECT _FUNC_(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
+       0
+      > SELECT _FUNC_(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
+       -10
   """,
   group = "agg_funcs",
   since = "3.4.0")
 // scalastyle:on line.size.limit
 case class Mode(
     child: Expression,
     mutableAggBufferOffset: Int = 0,
-    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
-  with ImplicitCastInputTypes with UnaryLike[Expression] {
+    inputAggBufferOffset: Int = 0,
+    deterministicExpr: Expression = Literal.FalseLiteral)
+  extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes
+    with BinaryLike[Expression] {
 
   def this(child: Expression) = this(child, 0, 0)
 
+  def this(child: Expression, deterministicExpr: Expression) = {
+    this(child, 0, 0, deterministicExpr)
+  }
+
+  @transient
+  protected lazy val deterministicResult = deterministicExpr.eval().asInstanceOf[Boolean]
+
+  override def left: Expression = child
+
+  override def right: Expression = deterministicExpr
+
   // Returns null for empty inputs
   override def nullable: Boolean = true
 
   override def dataType: DataType = child.dataType
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      return defaultCheck
+    }
+    if (!deterministicExpr.foldable) {
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> toSQLId("deterministic"),
+          "inputType" -> toSQLType(deterministicExpr.dataType),
+          "inputExpr" -> toSQLExpr(deterministicExpr)
+        )
+      )
+    } else {
+      TypeCheckSuccess

Review Comment:
   shall we fail for null deterministic flag?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1324657584


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -35,24 +42,57 @@ import org.apache.spark.util.collection.OpenHashMap
        0-10
       > SELECT _FUNC_(col) FROM VALUES (0), (10), (10), (null), (null), (null) AS tab(col);
        10
+      > SELECT _FUNC_(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
+       0
+      > SELECT _FUNC_(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
+       -10
   """,
   group = "agg_funcs",
   since = "3.4.0")
 // scalastyle:on line.size.limit
 case class Mode(
     child: Expression,
     mutableAggBufferOffset: Int = 0,
-    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
-  with ImplicitCastInputTypes with UnaryLike[Expression] {
+    inputAggBufferOffset: Int = 0,
+    deterministicResult: Expression = Literal.FalseLiteral)
+  extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes
+    with BinaryLike[Expression] {
 
   def this(child: Expression) = this(child, 0, 0)
 
+  def this(child: Expression, deterministicResult: Expression) = {
+    this(child, 0, 0, deterministicResult)
+  }
+
+  override def left: Expression = child
+
+  override def right: Expression = deterministicResult
+
   // Returns null for empty inputs
   override def nullable: Boolean = true
 
   override def dataType: DataType = child.dataType
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      return defaultCheck
+    }
+    if (!deterministicResult.foldable) {
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> "deterministicResult",

Review Comment:
   This PR fixes other places: https://github.com/apache/spark/pull/42905



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1324756847


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -18,15 +18,22 @@
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes}
-import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, UnaryLike}
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
+import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType}
 import org.apache.spark.util.collection.OpenHashMap
 
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(col) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.",
+  usage = """
+    _FUNC_(col[, deterministic]) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.
+      When multiple values have the same greatest frequency then either any of values is returned if 'deterministic' is false or is not defined, or the lowest value is returned if 'deterministic' is true.""",

Review Comment:
   to be consistent to other places:
   ```suggestion
         When multiple values have the same greatest frequency then either any of values is returned if `deterministic` is false or is not defined, or the lowest value is returned if `deterministic` is true.""",
   ```



##########
sql/core/src/test/resources/sql-tests/results/group-by.sql.out:
##########
@@ -1121,3 +1121,108 @@ struct<d:int>
 -- !query output
 0
 2
+
+
+-- !query
+SELECT mode(col) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode(col, false):int>
+-- !query output
+0
+
+
+-- !query
+SELECT mode(col, false) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode(col, false):int>
+-- !query output
+0
+
+
+-- !query
+SELECT mode(col, true) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode(col, true):int>
+-- !query output
+-10
+
+
+-- !query
+SELECT mode(col, 'true') FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"true\"",
+    "inputType" : "\"STRING\"",
+    "paramIndex" : "2",
+    "requiredType" : "\"BOOLEAN\"",
+    "sqlExpr" : "\"mode(col, true)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 24,
+    "fragment" : "mode(col, 'true')"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col, b) FROM VALUES (-10, false), (0, false), (10, false) AS tab(col, b)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputExpr" : "\"b\"",
+    "inputName" : "deterministicResult",
+    "inputType" : "\"BOOLEAN\"",
+    "sqlExpr" : "\"mode(col, b)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 19,
+    "fragment" : "mode(col, b)"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<mode(col, false):map<int,string>>
+-- !query output
+{1:"a"}
+
+
+-- !query
+SELECT mode(col, false) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<mode(col, false):map<int,string>>
+-- !query output
+{1:"a"}
+
+
+-- !query
+SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_2005",
+  "messageParameters" : {
+    "dataType" : "PhysicalMapType"
+  }

Review Comment:
   We have the ticket SPARK-42841 for this already. Could be replaced separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on PR #42755:
URL: https://github.com/apache/spark/pull/42755#issuecomment-1722538502

   +1, LGTM. Merging to master.
   Thank you, @peter-toth and @cloud-fan @srielau for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1325510653


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -18,15 +18,22 @@
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes}
-import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, UnaryLike}
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType}
 import org.apache.spark.util.collection.OpenHashMap
 
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(col) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.",
+  usage = """
+    _FUNC_(col[, deterministicResult]) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.

Review Comment:
   Ok, changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk closed pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk closed pull request #42755: [SPARK-45034][SQL] Support deterministic mode function
URL: https://github.com/apache/spark/pull/42755


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1325710585


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -35,24 +42,60 @@ import org.apache.spark.util.collection.OpenHashMap
        0-10
       > SELECT _FUNC_(col) FROM VALUES (0), (10), (10), (null), (null), (null) AS tab(col);
        10
+      > SELECT _FUNC_(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
+       0
+      > SELECT _FUNC_(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
+       -10
   """,
   group = "agg_funcs",
   since = "3.4.0")
 // scalastyle:on line.size.limit
 case class Mode(
     child: Expression,
     mutableAggBufferOffset: Int = 0,
-    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
-  with ImplicitCastInputTypes with UnaryLike[Expression] {
+    inputAggBufferOffset: Int = 0,
+    deterministicExpr: Expression = Literal.FalseLiteral)
+  extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes
+    with BinaryLike[Expression] {
 
   def this(child: Expression) = this(child, 0, 0)
 
+  def this(child: Expression, deterministicExpr: Expression) = {
+    this(child, 0, 0, deterministicExpr)
+  }
+
+  @transient
+  protected lazy val deterministicResult = deterministicExpr.eval().asInstanceOf[Boolean]
+
+  override def left: Expression = child
+
+  override def right: Expression = deterministicExpr
+
   // Returns null for empty inputs
   override def nullable: Boolean = true
 
   override def dataType: DataType = child.dataType
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      return defaultCheck
+    }
+    if (!deterministicExpr.foldable) {
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> toSQLId("deterministic"),
+          "inputType" -> toSQLType(deterministicExpr.dataType),
+          "inputExpr" -> toSQLExpr(deterministicExpr)
+        )
+      )
+    } else {
+      TypeCheckSuccess

Review Comment:
   ok, fixed in https://github.com/apache/spark/pull/42755/commits/948b8d7d8e478e84bfc2fe02cba9a73c8e0fed7e



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1323022298


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -18,15 +18,22 @@
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes}
-import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, UnaryLike}
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType}
 import org.apache.spark.util.collection.OpenHashMap
 
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(col) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.",
+  usage = """
+    _FUNC_(col[, deterministicResult]) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.

Review Comment:
   +1 for `deterministic`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1325516552


##########
python/pyspark/sql/functions.py:
##########
@@ -1195,12 +1195,12 @@ def mode(col: "ColumnOrName") -> Column:
     ...     ("dotNET", 2013, 48000), ("Java", 2013, 30000)],
     ...     schema=("course", "year", "earnings"))
     >>> df.groupby("course").agg(mode("year")).show()
-    +------+----------+
-    |course|mode(year)|
-    +------+----------+
-    |  Java|      2012|
-    |dotNET|      2012|
-    +------+----------+
+    +------+-----------------+
+    |course|mode(year, false)|

Review Comment:
   Thanks! This is actually a good question and I think we shall.
   I've added the new param to both Scala and Python APIs in both SQL and Connect in the latest.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -35,24 +42,57 @@ import org.apache.spark.util.collection.OpenHashMap
        0-10
       > SELECT _FUNC_(col) FROM VALUES (0), (10), (10), (null), (null), (null) AS tab(col);
        10
+      > SELECT _FUNC_(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
+       0
+      > SELECT _FUNC_(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
+       -10
   """,
   group = "agg_funcs",
   since = "3.4.0")
 // scalastyle:on line.size.limit
 case class Mode(
     child: Expression,
     mutableAggBufferOffset: Int = 0,
-    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
-  with ImplicitCastInputTypes with UnaryLike[Expression] {
+    inputAggBufferOffset: Int = 0,
+    deterministicResult: Expression = Literal.FalseLiteral)
+  extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes
+    with BinaryLike[Expression] {
 
   def this(child: Expression) = this(child, 0, 0)
 
+  def this(child: Expression, deterministicResult: Expression) = {
+    this(child, 0, 0, deterministicResult)
+  }
+
+  override def left: Expression = child
+
+  override def right: Expression = deterministicResult
+
   // Returns null for empty inputs
   override def nullable: Boolean = true
 
   override def dataType: DataType = child.dataType
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      return defaultCheck
+    }
+    if (!deterministicResult.foldable) {
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> "deterministicResult",

Review Comment:
   Ok, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1325516552


##########
python/pyspark/sql/functions.py:
##########
@@ -1195,12 +1195,12 @@ def mode(col: "ColumnOrName") -> Column:
     ...     ("dotNET", 2013, 48000), ("Java", 2013, 30000)],
     ...     schema=("course", "year", "earnings"))
     >>> df.groupby("course").agg(mode("year")).show()
-    +------+----------+
-    |course|mode(year)|
-    +------+----------+
-    |  Java|      2012|
-    |dotNET|      2012|
-    +------+----------+
+    +------+-----------------+
+    |course|mode(year, false)|

Review Comment:
   Thanks! This is actually a good question and I think we shall.
   I've added the new param to both Scala and Python APIs in both SQL and Connect in the latest version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1323175490


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -35,24 +42,57 @@ import org.apache.spark.util.collection.OpenHashMap
        0-10
       > SELECT _FUNC_(col) FROM VALUES (0), (10), (10), (null), (null), (null) AS tab(col);
        10
+      > SELECT _FUNC_(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
+       0
+      > SELECT _FUNC_(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
+       -10
   """,
   group = "agg_funcs",
   since = "3.4.0")
 // scalastyle:on line.size.limit
 case class Mode(
     child: Expression,
     mutableAggBufferOffset: Int = 0,
-    inputAggBufferOffset: Int = 0) extends TypedAggregateWithHashMapAsBuffer
-  with ImplicitCastInputTypes with UnaryLike[Expression] {
+    inputAggBufferOffset: Int = 0,
+    deterministicResult: Expression = Literal.FalseLiteral)
+  extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes
+    with BinaryLike[Expression] {
 
   def this(child: Expression) = this(child, 0, 0)
 
+  def this(child: Expression, deterministicResult: Expression) = {
+    this(child, 0, 0, deterministicResult)
+  }
+
+  override def left: Expression = child
+
+  override def right: Expression = deterministicResult
+
   // Returns null for empty inputs
   override def nullable: Boolean = true
 
   override def dataType: DataType = child.dataType
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      return defaultCheck
+    }
+    if (!deterministicResult.foldable) {
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> "deterministicResult",

Review Comment:
   It is an id, so, should be quoted by `toSQLId`



##########
python/pyspark/sql/functions.py:
##########
@@ -1195,12 +1195,12 @@ def mode(col: "ColumnOrName") -> Column:
     ...     ("dotNET", 2013, 48000), ("Java", 2013, 30000)],
     ...     schema=("course", "year", "earnings"))
     >>> df.groupby("course").agg(mode("year")).show()
-    +------+----------+
-    |course|mode(year)|
-    +------+----------+
-    |  Java|      2012|
-    |dotNET|      2012|
-    +------+----------+
+    +------+-----------------+
+    |course|mode(year, false)|

Review Comment:
   Shell we support new parameter in Python API too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1322632333


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -18,15 +18,22 @@
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes}
-import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, UnaryLike}
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType}
 import org.apache.spark.util.collection.OpenHashMap
 
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(col) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.",
+  usage = """
+    _FUNC_(col[, deterministicResult]) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.

Review Comment:
   Maybe, make the parameter name shorter? How about `static` or `deterministic`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1323031312


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -81,7 +121,16 @@ case class Mode(
       return null
     }
 
-    buffer.maxBy(_._2)._1
+    (if (right.eval().asInstanceOf[Boolean]) {

Review Comment:
   nit: we can follow `PercentileBase#percentages` and evaluate the foldable parameters in a lazy val.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on PR #42755:
URL: https://github.com/apache/spark/pull/42755#issuecomment-1720914122

   Hmm, the failure seems unrelated but persistent...
   ```
   [info] - client INVALID_CURSOR.DISCONNECTED error is retried when other RPC preempts this one *** FAILED *** (385 milliseconds)
   [info]   org.apache.spark.SparkException: io.grpc.StatusRuntimeException: INTERNAL: [INVALID_CURSOR.POSITION_NOT_AVAILABLE] The cursor is invalid. The cursor position id c156089c-f861-467e-9031-31ee6f26d2b4 is no longer available at index 2.
   [info]   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toThrowable(GrpcExceptionConverter.scala:113)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1322715522


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -18,15 +18,22 @@
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes}
-import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, ImplicitCastInputTypes, Literal}
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, UnaryLike}
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType}
 import org.apache.spark.util.collection.OpenHashMap
 
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(col) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.",
+  usage = """
+    _FUNC_(col[, deterministicResult]) - Returns the most frequent value for the values within `col`. NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL.

Review Comment:
   +1 deterministic



##########
sql/core/src/test/resources/sql-tests/results/group-by.sql.out:
##########
@@ -1121,3 +1121,108 @@ struct<d:int>
 -- !query output
 0
 2
+
+
+-- !query
+SELECT mode(col) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode(col, false):int>
+-- !query output
+0
+
+
+-- !query
+SELECT mode(col, false) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode(col, false):int>
+-- !query output
+0
+
+
+-- !query
+SELECT mode(col, true) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode(col, true):int>
+-- !query output
+-10
+
+
+-- !query
+SELECT mode(col, 'true') FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"true\"",
+    "inputType" : "\"STRING\"",
+    "paramIndex" : "2",
+    "requiredType" : "\"BOOLEAN\"",
+    "sqlExpr" : "\"mode(col, true)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 24,
+    "fragment" : "mode(col, 'true')"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col, b) FROM VALUES (-10, false), (0, false), (10, false) AS tab(col, b)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputExpr" : "\"b\"",
+    "inputName" : "deterministicResult",
+    "inputType" : "\"BOOLEAN\"",
+    "sqlExpr" : "\"mode(col, b)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 19,
+    "fragment" : "mode(col, b)"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<mode(col, false):map<int,string>>
+-- !query output
+{1:"a"}
+
+
+-- !query
+SELECT mode(col, false) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<mode(col, false):map<int,string>>
+-- !query output
+{1:"a"}
+
+
+-- !query
+SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_2005",
+  "messageParameters" : {
+    "dataType" : "PhysicalMapType"
+  }

Review Comment:
   Would it be possible to fix that legacy error while we're in the neighborhood?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on a diff in pull request #42755: [SPARK-45034][SQL] Support deterministic mode function

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #42755:
URL: https://github.com/apache/spark/pull/42755#discussion_r1325512164


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -81,7 +121,16 @@ case class Mode(
       return null
     }
 
-    buffer.maxBy(_._2)._1
+    (if (right.eval().asInstanceOf[Boolean]) {

Review Comment:
   Sure. Fixed in latest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org