You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "stevomitric (via GitHub)" <gi...@apache.org> on 2024/03/17 10:09:05 UTC

[PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

stevomitric opened a new pull request, #45549:
URL: https://github.com/apache/spark/pull/45549

   [DRAFT] Until PR https://github.com/apache/spark/pull/45069 becomes resolved
   
   ### What changes were proposed in this pull request?
   Changes proposed in this PR include:
   - Relaxed checks that prevent aggregating of map types
   - Added new analyzer rule that uses `map_sort` proposed in [this PR](https://github.com/apache/spark/pull/45069)
   - Created codegen that compares two sorted maps
   
   
   ### Why are the changes needed?
   Adding new functionality to GROUP BY map types
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, ability to use `GROUP BY MapType`
   
   
   ### How was this patch tested?
   With new UTs
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537234205


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -722,12 +722,74 @@ class CodegenContext extends Logging {
           }
         """
       s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
+    case map: MapType =>
+      val compareFunc = freshName("compareMapData")
+      val funcCode = genCompMapData(map.keyType, map.valueType, compareFunc)
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
     case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
     case _ =>
       throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError("compare", dataType)
   }
 
+  private def genCompMapData(keyType: DataType,
+    valueType: DataType, compareFunc : String): String = {
+    val keyArrayA = freshName("keyArrayA")
+    val keyArrayB = freshName("keyArrayB")
+    val valueArrayA = freshName("valueArrayA")
+    val valueArrayB = freshName("valueArrayB")
+    val minLength = freshName("minLength")
+    s"""

Review Comment:
   nit: use this Scala feature to avoid manual indentation control
   ```
   """
     |aa
     |bb
     |""".stripMargin
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537236592


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -98,9 +98,7 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
     case FloatType | DoubleType => true
     case StructType(fields) => fields.exists(f => needNormalize(f.dataType))
     case ArrayType(et, _) => needNormalize(et)
-    // Currently MapType is not comparable and analyzer should fail earlier if this case happens.
-    case _: MapType =>
-      throw SparkException.internalError("grouping/join/window partition keys cannot be map type.")
+    case MapType(_, vt, _) => needNormalize(vt)

Review Comment:
   we need to check key type as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stevomitric (via GitHub)" <gi...@apache.org>.
stevomitric commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537492769


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -722,12 +722,74 @@ class CodegenContext extends Logging {
           }
         """
       s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
+    case map: MapType =>
+      val compareFunc = freshName("compareMapData")
+      val funcCode = genCompMapData(map.keyType, map.valueType, compareFunc)
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
     case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
     case _ =>
       throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError("compare", dataType)
   }
 
+  private def genCompMapData(keyType: DataType,
+    valueType: DataType, compareFunc : String): String = {
+    val keyArrayA = freshName("keyArrayA")
+    val keyArrayB = freshName("keyArrayB")
+    val valueArrayA = freshName("valueArrayA")
+    val valueArrayB = freshName("valueArrayB")
+    val minLength = freshName("minLength")
+    s"""

Review Comment:
   Was following same logic for `genComp.ArrayType`. Refactored to use `stripMargin` (we should probably do the same for `ArrayType` as well)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stevomitric (via GitHub)" <gi...@apache.org>.
stevomitric commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537509363


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -144,6 +142,14 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
       val function = normalize(lv)
       KnownFloatingPointNormalized(ArrayTransform(expr, LambdaFunction(function, Seq(lv))))
 
+    case _ if expr.dataType.isInstanceOf[MapType] =>
+      val MapType(kt, vt, containsNull) = expr.dataType
+      val keys = NamedLambdaVariable("arg", kt, containsNull)
+      val values = NamedLambdaVariable("arg", vt, containsNull)
+      val function = normalize(values)

Review Comment:
   Do we need to normalize keys? Would a conflict arise for a map that looks like this: `Map(0.0 -> 1, -0.0 -> 2)` as we don't support duplicate keys and `-0.0` would be reduced to `0.0`. Aren't keys normalized on map creation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stevomitric (via GitHub)" <gi...@apache.org>.
stevomitric commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1538952282


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -144,6 +142,14 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
       val function = normalize(lv)
       KnownFloatingPointNormalized(ArrayTransform(expr, LambdaFunction(function, Seq(lv))))
 
+    case _ if expr.dataType.isInstanceOf[MapType] =>
+      val MapType(kt, vt, containsNull) = expr.dataType
+      val keys = NamedLambdaVariable("arg", kt, containsNull)
+      val values = NamedLambdaVariable("arg", vt, containsNull)
+      val function = normalize(values)

Review Comment:
   Created a [separate PR](https://github.com/apache/spark/pull/45721) for map normalization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45549:
URL: https://github.com/apache/spark/pull/45549#issuecomment-2022470114

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45549:
URL: https://github.com/apache/spark/pull/45549#issuecomment-2021658806

   <img width="710" alt="image" src="https://github.com/apache/spark/assets/3182036/21d4f331-20f0-4fc0-b556-3ffcdfe604a3">
   there are still test failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stefankandic (via GitHub)" <gi...@apache.org>.
stefankandic commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1535340224


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2469,3 +2470,25 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
       }
   }
 }
+
+/**
+ * Adds MapSort to group expressions containing map columns, as the key/value paris need to be
+ * in the correct order before grouping:
+ * SELECT COUNT(*) FROM TABLE GROUP BY map_column =>
+ * SELECT COUNT(*) FROM TABLE GROUP BY map_sort(map_column)
+ */
+object InsertMapSortInGroupingExpressions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(AGGREGATE), ruleId) {
+    case a @ Aggregate(groupingExpr, x, b) =>
+      val newGrouping = groupingExpr.map { expr =>
+        (expr, expr.dataType) match {

Review Comment:
   i know i'm the one who suggested it but on a second thought it would probably be more readable to just say
   
   ```
   if (!expr.instanceOf[MapSort] && expr.dataType.instanceOf[MapType]) {
     MapSort(expr)
   else {
     expr
   }
   ```
   
   but I'm fine if you want to keep it as is.
   
   Or even just simplify it even further if we put it inside a once only batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #45549: [SPARK-47430][SQL] Support GROUP BY for MapType
URL: https://github.com/apache/spark/pull/45549


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stefankandic (via GitHub)" <gi...@apache.org>.
stefankandic commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1535334643


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -144,6 +142,17 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
       val function = normalize(lv)
       KnownFloatingPointNormalized(ArrayTransform(expr, LambdaFunction(function, Seq(lv))))
 
+    case _ if expr.dataType.isInstanceOf[MapType] =>
+      val MapType(kt, vt, containsNull) = expr.dataType
+      val lv1 = NamedLambdaVariable("arg", kt, containsNull)
+      val lv2 = NamedLambdaVariable("arg", vt, containsNull)
+      val functionL1 = normalize(lv1)
+      val functionL2 = normalize(lv2)
+      KnownFloatingPointNormalized(

Review Comment:
   if i'm reading this correctly this will return an array and not a map - we should probably want to do normalization on keys and values separately and then create a new map out of those right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stefankandic (via GitHub)" <gi...@apache.org>.
stefankandic commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1535325881


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -98,9 +98,7 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
     case FloatType | DoubleType => true
     case StructType(fields) => fields.exists(f => needNormalize(f.dataType))
     case ArrayType(et, _) => needNormalize(et)
-    // Currently MapType is not comparable and analyzer should fail earlier if this case happens.
-    case _: MapType =>
-      throw SparkException.internalError("grouping/join/window partition keys cannot be map type.")
+    case MapType(kt, vt, _) => needNormalize(kt) && needNormalize(vt)

Review Comment:
   shouldn't we check for either and not both?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stefankandic (via GitHub)" <gi...@apache.org>.
stefankandic commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1535329950


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -98,9 +98,7 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
     case FloatType | DoubleType => true
     case StructType(fields) => fields.exists(f => needNormalize(f.dataType))
     case ArrayType(et, _) => needNormalize(et)
-    // Currently MapType is not comparable and analyzer should fail earlier if this case happens.
-    case _: MapType =>
-      throw SparkException.internalError("grouping/join/window partition keys cannot be map type.")
+    case MapType(kt, vt, _) => needNormalize(kt) && needNormalize(vt)

Review Comment:
   also we should probably add some tests for cases where the normalization is needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537238434


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -196,6 +196,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
       ReplaceDeduplicateWithAggregate) ::
     Batch("Aggregate", fixedPoint,
       RemoveLiteralFromGroupExpressions,
+      InsertMapSortInGroupingExpressions,

Review Comment:
   I think we should run this rule near where we run `NormalizeFloatingNumbers`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2469,3 +2470,24 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
       }
   }
 }
+
+/**
+ * Adds MapSort to group expressions containing map columns, as the key/value paris need to be
+ * in the correct order before grouping:
+ * SELECT COUNT(*) FROM TABLE GROUP BY map_column =>
+ * SELECT COUNT(*) FROM TABLE GROUP BY map_sort(map_column)
+ */
+object InsertMapSortInGroupingExpressions extends Rule[LogicalPlan] {

Review Comment:
   we should add a new file for new rules



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stevomitric (via GitHub)" <gi...@apache.org>.
stevomitric commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537509363


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -144,6 +142,14 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
       val function = normalize(lv)
       KnownFloatingPointNormalized(ArrayTransform(expr, LambdaFunction(function, Seq(lv))))
 
+    case _ if expr.dataType.isInstanceOf[MapType] =>
+      val MapType(kt, vt, containsNull) = expr.dataType
+      val keys = NamedLambdaVariable("arg", kt, containsNull)
+      val values = NamedLambdaVariable("arg", vt, containsNull)
+      val function = normalize(values)

Review Comment:
   Do we need to normalize keys? Would a conflict arise for a map that looks like this: `Map(0.0 -> 1, -0.0 -> 2)` as we don't support duplicate keys and `-0.0` would be reduced to `0.0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537847562


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -244,7 +244,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
       RemoveRedundantAliases,
       RemoveNoopOperators) :+
     // This batch must be executed after the `RewriteSubquery` batch, which creates joins.
-    Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) :+
+    Batch("NormalizeFloatingNumbers", Once,
+      InsertMapSortInGroupingExpressions,

Review Comment:
   can we create a new batch for this rule?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537849913


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##########
@@ -2155,8 +2155,8 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
-  test("SPARK-46536 Support GROUP BY CalendarIntervalType") {
-    val numRows = 50
+  private def assertAggregateOnDataframe(dfSeq: Seq[DataFrame],

Review Comment:
   I'd rather test one DataFrame at a time, and the caller calls `assertAggregateOnDataframe` multiple times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stefankandic (via GitHub)" <gi...@apache.org>.
stefankandic commented on PR #45549:
URL: https://github.com/apache/spark/pull/45549#issuecomment-2014666179

   can you merge the latest master into your branch so we'll only see the group by map changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537231191


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala:
##########
@@ -193,15 +193,6 @@ object ExprUtils extends QueryErrorsBase {
           messageParameters = Map("sqlExpr" -> expr.sql))
       }
 
-      // Check if the data type of expr is orderable.
-      if (expr.dataType.existsRecursively(_.isInstanceOf[MapType])) {
-        expr.failAnalysis(
-          errorClass = "GROUP_EXPRESSION_TYPE_IS_NOT_ORDERABLE",

Review Comment:
   we should also remove the error from `error-classes.json`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stefankandic (via GitHub)" <gi...@apache.org>.
stefankandic commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1535329950


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -98,9 +98,7 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
     case FloatType | DoubleType => true
     case StructType(fields) => fields.exists(f => needNormalize(f.dataType))
     case ArrayType(et, _) => needNormalize(et)
-    // Currently MapType is not comparable and analyzer should fail earlier if this case happens.
-    case _: MapType =>
-      throw SparkException.internalError("grouping/join/window partition keys cannot be map type.")
+    case MapType(kt, vt, _) => needNormalize(kt) && needNormalize(vt)

Review Comment:
   also we should probably add some tests for the normalization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537806223


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -722,12 +702,76 @@ class CodegenContext extends Logging {
           }
         """
       s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
+    case map: MapType =>
+      val compareFunc = freshName("compareMapData")
+      val funcCode = genCompMapData(map.keyType, map.valueType, compareFunc)
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
     case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
     case _ =>
       throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError("compare", dataType)
   }
 
+  private def genCompMapData(
+      keyType: DataType,
+      valueType: DataType,
+      compareFunc : String): String = {

Review Comment:
   ```suggestion
         compareFunc: String): String = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stevomitric (via GitHub)" <gi...@apache.org>.
stevomitric commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537482265


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala:
##########
@@ -193,15 +193,6 @@ object ExprUtils extends QueryErrorsBase {
           messageParameters = Map("sqlExpr" -> expr.sql))
       }
 
-      // Check if the data type of expr is orderable.
-      if (expr.dataType.existsRecursively(_.isInstanceOf[MapType])) {
-        expr.failAnalysis(
-          errorClass = "GROUP_EXPRESSION_TYPE_IS_NOT_ORDERABLE",

Review Comment:
   \+ modified a test inside `AnalysisErrorSuite.scala` that uses it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stevomitric (via GitHub)" <gi...@apache.org>.
stevomitric commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1535672069


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -98,9 +98,7 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
     case FloatType | DoubleType => true
     case StructType(fields) => fields.exists(f => needNormalize(f.dataType))
     case ArrayType(et, _) => needNormalize(et)
-    // Currently MapType is not comparable and analyzer should fail earlier if this case happens.
-    case _: MapType =>
-      throw SparkException.internalError("grouping/join/window partition keys cannot be map type.")
+    case MapType(kt, vt, _) => needNormalize(kt) && needNormalize(vt)

Review Comment:
   Leaving it only for value types as discussed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "stevomitric (via GitHub)" <gi...@apache.org>.
stevomitric commented on PR #45549:
URL: https://github.com/apache/spark/pull/45549#issuecomment-2022196660

   > <img alt="image" width="710" src="https://private-user-images.githubusercontent.com/3182036/317074865-21d4f331-20f0-4fc0-b556-3ffcdfe604a3.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTE1MjgyODgsIm5iZiI6MTcxMTUyNzk4OCwicGF0aCI6Ii8zMTgyMDM2LzMxNzA3NDg2NS0yMWQ0ZjMzMS0yMGYwLTRmYzAtYjU1Ni0zZmZjZGZlNjA0YTMucG5nP1gtQW16LUFsZ29yaXRobT1BV1M0LUhNQUMtU0hBMjU2JlgtQW16LUNyZWRlbnRpYWw9QUtJQVZDT0RZTFNBNTNQUUs0WkElMkYyMDI0MDMyNyUyRnVzLWVhc3QtMSUyRnMzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyNDAzMjdUMDgyNjI4WiZYLUFtei1FeHBpcmVzPTMwMCZYLUFtei1TaWduYXR1cmU9MDM5NWJhYTRjMDEzNzA3ZDYwMTA0NzhjYzExZGQ1Y2U5OGQ2NWRhNTA3YzZmY2I5ZjhmYWIxMDdiOWIyMjQ3YyZYLUFtei1TaWduZWRIZWFkZXJzPWhvc3QmYWN0b3JfaWQ9MCZrZXlfaWQ9MCZyZXBvX2lkPTAifQ.Rlrcrk1FD3yi68-IrvAmhqJ3j3yLLjKZsUopEoKkajI">
   > there are still test failures
   
   Build should be fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537232100


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -722,12 +722,74 @@ class CodegenContext extends Logging {
           }
         """
       s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
+    case map: MapType =>
+      val compareFunc = freshName("compareMapData")
+      val funcCode = genCompMapData(map.keyType, map.valueType, compareFunc)
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
     case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
     case _ =>
       throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError("compare", dataType)
   }
 
+  private def genCompMapData(keyType: DataType,
+    valueType: DataType, compareFunc : String): String = {

Review Comment:
   ```suggestion
     private def genCompMapData(
         keyType: DataType,
         valueType: DataType,
         compareFunc : String): String = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537809292


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -722,12 +702,76 @@ class CodegenContext extends Logging {
           }
         """
       s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
+    case map: MapType =>
+      val compareFunc = freshName("compareMapData")
+      val funcCode = genCompMapData(map.keyType, map.valueType, compareFunc)
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
     case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
     case _ =>
       throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError("compare", dataType)
   }
 
+  private def genCompMapData(
+      keyType: DataType,
+      valueType: DataType,
+      compareFunc : String): String = {
+    val keyArrayA = freshName("keyArrayA")
+    val keyArrayB = freshName("keyArrayB")
+    val valueArrayA = freshName("valueArrayA")
+    val valueArrayB = freshName("valueArrayB")
+    val minLength = freshName("minLength")
+    s"""
+       |public int $compareFunc(MapData a, MapData b) {
+       |  int lengthA = a.numElements();
+       |  int lengthB = b.numElements();
+       |  ArrayData $keyArrayA = a.keyArray();
+       |  ArrayData $valueArrayA = a.valueArray();
+       |  ArrayData $keyArrayB = b.keyArray();
+       |  ArrayData $valueArrayB = b.valueArray();

Review Comment:
   do the above 4 variables need to use `freshName`? They are just local variables in this method.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -722,12 +702,76 @@ class CodegenContext extends Logging {
           }
         """
       s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
+    case map: MapType =>
+      val compareFunc = freshName("compareMapData")
+      val funcCode = genCompMapData(map.keyType, map.valueType, compareFunc)
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
     case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
     case _ =>
       throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError("compare", dataType)
   }
 
+  private def genCompMapData(
+      keyType: DataType,
+      valueType: DataType,
+      compareFunc : String): String = {
+    val keyArrayA = freshName("keyArrayA")
+    val keyArrayB = freshName("keyArrayB")
+    val valueArrayA = freshName("valueArrayA")
+    val valueArrayB = freshName("valueArrayB")
+    val minLength = freshName("minLength")
+    s"""
+       |public int $compareFunc(MapData a, MapData b) {
+       |  int lengthA = a.numElements();
+       |  int lengthB = b.numElements();
+       |  ArrayData $keyArrayA = a.keyArray();
+       |  ArrayData $valueArrayA = a.valueArray();
+       |  ArrayData $keyArrayB = b.keyArray();
+       |  ArrayData $valueArrayB = b.valueArray();
+       |  int $minLength = (lengthA > lengthB) ? lengthB : lengthA;

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537803505


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -144,6 +142,14 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
       val function = normalize(lv)
       KnownFloatingPointNormalized(ArrayTransform(expr, LambdaFunction(function, Seq(lv))))
 
+    case _ if expr.dataType.isInstanceOf[MapType] =>
+      val MapType(kt, vt, containsNull) = expr.dataType
+      val keys = NamedLambdaVariable("arg", kt, containsNull)
+      val values = NamedLambdaVariable("arg", vt, containsNull)
+      val function = normalize(values)

Review Comment:
   You are right, this should be handled when creating the map. We should fix `ArrayBasedMapBuilder` to handle floating points well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537236100


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -722,12 +722,74 @@ class CodegenContext extends Logging {
           }
         """
       s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
+    case map: MapType =>
+      val compareFunc = freshName("compareMapData")
+      val funcCode = genCompMapData(map.keyType, map.valueType, compareFunc)
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
     case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
     case _ =>
       throw QueryExecutionErrors.cannotGenerateCodeForIncomparableTypeError("compare", dataType)
   }
 
+  private def genCompMapData(keyType: DataType,
+    valueType: DataType, compareFunc : String): String = {
+    val keyArrayA = freshName("keyArrayA")
+    val keyArrayB = freshName("keyArrayB")
+    val valueArrayA = freshName("valueArrayA")
+    val valueArrayB = freshName("valueArrayB")
+    val minLength = freshName("minLength")
+    s"""
+      public int $compareFunc(MapData a, MapData b) {
+        int lengthA = a.numElements();
+        int lengthB = b.numElements();
+        ArrayData $keyArrayA = a.keyArray();
+        ArrayData $valueArrayA = a.valueArray();
+        ArrayData $keyArrayB = b.keyArray();
+        ArrayData $valueArrayB = b.valueArray();
+        int $minLength = (lengthA > lengthB) ? lengthB : lengthA;
+        for (int i = 0; i < $minLength; i++) {
+          ${genCompElementsAt(keyArrayA, keyArrayB, "i", keyType)}
+          ${genCompElementsAt(valueArrayA, valueArrayB, "i", valueType)}
+        }
+
+        if (lengthA < lengthB) {
+          return -1;
+        } else if (lengthA > lengthB) {
+          return 1;
+        }
+        return 0;
+      }
+    """
+  }
+
+  private def genCompElementsAt(arrayA: String, arrayB: String, i: String,

Review Comment:
   can we share some code between this and the array type comparison codegen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47430][SQL] Support GROUP BY for MapType [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45549:
URL: https://github.com/apache/spark/pull/45549#discussion_r1537237701


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala:
##########
@@ -144,6 +142,14 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
       val function = normalize(lv)
       KnownFloatingPointNormalized(ArrayTransform(expr, LambdaFunction(function, Seq(lv))))
 
+    case _ if expr.dataType.isInstanceOf[MapType] =>
+      val MapType(kt, vt, containsNull) = expr.dataType
+      val keys = NamedLambdaVariable("arg", kt, containsNull)
+      val values = NamedLambdaVariable("arg", vt, containsNull)
+      val function = normalize(values)

Review Comment:
   why don't we normalize map keys?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org