You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "LuciferYang (via GitHub)" <gi...@apache.org> on 2023/09/06 08:29:41 UTC

[GitHub] [spark] LuciferYang commented on a diff in pull request #42757: [SPARK-45036][SQL] SPJ: Simplify the logic to handle partially clustered distribution

LuciferYang commented on code in PR #42757:
URL: https://github.com/apache/spark/pull/42757#discussion_r1316926504


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala:
##########
@@ -143,32 +143,25 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
         // also sort the input partitions according to their partition key order. This ensures
         // a canonical order from both sides of a bucketed join, for example.
         val partitionDataTypes = expressions.map(_.dataType)
-        val partitionOrdering: Ordering[(InternalRow, Seq[InputPartition])] = {
+        val partitionOrdering: Ordering[(InternalRow, InputPartition)] = {
           RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1)
         }
-
-        val partitions = if (groupSplits) {
-          // Group the splits by their partition value
-          results
+        val sortedKeyToPartitions = results.sorted(partitionOrdering)
+        val groupedPartitions = sortedKeyToPartitions
             .map(t => (InternalRowComparableWrapper(t._1, expressions), t._2))
             .groupBy(_._1)

Review Comment:
   The problem likely comes from this groupBy, as there are some differences between Scala 2.12 and Scala 2.13. 
   
   For example, for the input `val input = Seq((50,50),(51,51),(52,52))`, when executing `input.groupBy(_._1).toSeq`, the result is `Vector((50,List((50,50))), (51,List((51,51))), (52,List((52,52))))` when using Scala 2.12.18,  but when using Scala 2.13.8, the result is `List((52,List((52,52))), (50,List((50,50))), (51,List((51,51))))`. We can see that when using Scala 2.13.8, the order of the results has changed.
   
   The possible fix could be: 
   1. Using another function to replace `groupBy` to maintain the output order, such as `foldLeft with LinkedHashMap` ? 
   2. Re-sorting the `groupedPartitions`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org