You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengchenyu (via GitHub)" <gi...@apache.org> on 2023/12/27 12:45:01 UTC

[PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

zhengchenyu opened a new pull request, #44512:
URL: https://github.com/apache/spark/pull/44512

   ### What changes were proposed in this pull request?
   
   After the shuffle reader obtains the block, it will first perform a combine operation, and then perform a sort operation. It is known that both combine and sort may generate temporary files, so the performance may be poor when both sort and combine are used. In fact, combine operations can be performed during the sort process, and we can avoid the combine spill file.
   See  https://issues.apache.org/jira/browse/SPARK-46512 for details
   
   ### Why are the changes needed?
   
   Reduce the number of spills to disk when both sort and combine are used
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   spark examples run in local and yarn mode.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44512:
URL: https://github.com/apache/spark/pull/44512#discussion_r1460207780


##########
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala:
##########
@@ -111,31 +111,50 @@ private[spark] class BlockStoreShuffleReader[K, C](
     // An interruptible iterator must be used here in order to support task cancellation
     val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
 
-    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
+    // Sort the output if there is a sort ordering defined.
+    var aggregated = false
+    // The type of the value cannot be determined here, maybe the type of value
+    // or the type of combined value.
+    val sortedIter: Iterator[Product2[K, Nothing]] = dep.keyOrdering match {
+      case Some(keyOrd: Ordering[K]) =>
+        // Create an ExternalSorter to sort the data.
+        val sorter: ExternalSorter[K, _, C] = if (dep.aggregator.isDefined) {
+          aggregated = true
+          if (dep.mapSideCombine) {
+            new ExternalSorter[K, C, C](context,
+              Option(new Aggregator[K, C, C](identity,
+                dep.aggregator.get.mergeCombiners,
+                dep.aggregator.get.mergeCombiners)),
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          } else {
+            new ExternalSorter[K, Nothing, C](context,
+              dep.aggregator.asInstanceOf[Option[Aggregator[K, Nothing, C]]],
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          }
+        } else {
+          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
+            serializer = dep.serializer)
+        }
+        sorter.insertAllAndUpdateMetrics(interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]).
+          asInstanceOf[Iterator[(K, Nothing)]]
+      case None =>
+        interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+    }
+
+    val resultIter: Iterator[Product2[K, C]] = if (!aggregated && dep.aggregator.isDefined) {
       if (dep.mapSideCombine) {
         // We are reading values that are already combined
-        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
+        val combinedKeyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, C)]]
         dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
       } else {
         // We don't know the value type, but also don't care -- the dependency *should*
         // have made sure its compatible w/ this aggregator, which will convert the value
         // type to the combined type C
-        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+        val keyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, Nothing)]]
         dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
       }
     } else {
-      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
-    }

Review Comment:
   We can drop the `aggregated` flag.
   
   In other words, we can make this simply:
   
   ```
   val resultIter = {
     if (dep.keyOrdering.isDefined) {
       // whatever is in dep.keyOrdering == match "Some(keyOrd: Ordering[K])" clause.
     } else if (dep.aggregator.isDefined) {
   
       // whatever is in "if (!aggregated && dep.aggregator.isDefined) {"
   
     } else {
       interruptibleIter
     }
   }
   ```
   
   The proposed code is effectively this - but spread across two blocks tied through `aggregated` flag and `dep.aggregator.isDefined`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "zhengchenyu (via GitHub)" <gi...@apache.org>.
zhengchenyu closed pull request #44512: [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used.
URL: https://github.com/apache/spark/pull/44512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #44512: [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used.
URL: https://github.com/apache/spark/pull/44512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "zhengchenyu (via GitHub)" <gi...@apache.org>.
zhengchenyu commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1871886874

   @Ngone51 
   Thanks for you review! The combine is the key problem. 
   I still combine, but combine in ExternalSorter will never trigger extra spill. Combine happen in insertAllAndUpdateMetrics and merge:
   (1) combine when insert, just read the exist key and combine the value, do it in memory. 
   (2) combine when merge, we sort the combined iterator, combine util next key occur, also do it in memory.
   The two processes are not additional operations and are originally performed in the ExternalSorter.
   
   Before the change, We use ExternalAppendOnlyMap to combine. If the memory is over threshold, will spill to disk. Then we use ExternalSorter to sort. If the memory is over threshold, will spill to disk. **It means we may spill twice(ExternalAppendOnlyMap and ExternalSorter), if shuffle data is huge.**
   
   We know that we can combine during the sorting process. 
   After the change, we will use ExternalSorter, combine occurs during insert and merge processes. So there is no need to call ExternalAppendOnlyMap to combine. **It means only spill once (ExternalSorter).**
   
   Spill is time consuming. For sort and combine shuffle, when shuffle date is huge, this PR will save time。
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "zhengchenyu (via GitHub)" <gi...@apache.org>.
zhengchenyu commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1874801219

   > It looks like `ExternalAppendOnlyMap` uses the key for combine:
   > 
   > ```
   > dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
   > ```
   > 
   > but `ExternalSorter` uses partition id:
   > 
   > ```
   > map.changeValue((actualPartitioner.getPartition(kv._1), kv._1), update)
   > ```
   > 
   > Do you think this would lead to the different result?
   
   @Ngone51 
   It seems that the key for combine is Tuple2(partitionid, key).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1912799697

   Can you update to latest @zhengchenyu ? That should hopefully fix the build.
   
   Will give a couple of days more for folks to review the patch in meantime.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1925955552

   Merged to master.
   Thanks for adding this @zhengchenyu !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44512:
URL: https://github.com/apache/spark/pull/44512#discussion_r1460207780


##########
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala:
##########
@@ -111,31 +111,50 @@ private[spark] class BlockStoreShuffleReader[K, C](
     // An interruptible iterator must be used here in order to support task cancellation
     val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
 
-    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
+    // Sort the output if there is a sort ordering defined.
+    var aggregated = false
+    // The type of the value cannot be determined here, maybe the type of value
+    // or the type of combined value.
+    val sortedIter: Iterator[Product2[K, Nothing]] = dep.keyOrdering match {
+      case Some(keyOrd: Ordering[K]) =>
+        // Create an ExternalSorter to sort the data.
+        val sorter: ExternalSorter[K, _, C] = if (dep.aggregator.isDefined) {
+          aggregated = true
+          if (dep.mapSideCombine) {
+            new ExternalSorter[K, C, C](context,
+              Option(new Aggregator[K, C, C](identity,
+                dep.aggregator.get.mergeCombiners,
+                dep.aggregator.get.mergeCombiners)),
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          } else {
+            new ExternalSorter[K, Nothing, C](context,
+              dep.aggregator.asInstanceOf[Option[Aggregator[K, Nothing, C]]],
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          }
+        } else {
+          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
+            serializer = dep.serializer)
+        }
+        sorter.insertAllAndUpdateMetrics(interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]).
+          asInstanceOf[Iterator[(K, Nothing)]]
+      case None =>
+        interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+    }
+
+    val resultIter: Iterator[Product2[K, C]] = if (!aggregated && dep.aggregator.isDefined) {
       if (dep.mapSideCombine) {
         // We are reading values that are already combined
-        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
+        val combinedKeyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, C)]]
         dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
       } else {
         // We don't know the value type, but also don't care -- the dependency *should*
         // have made sure its compatible w/ this aggregator, which will convert the value
         // type to the combined type C
-        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+        val keyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, Nothing)]]
         dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
       }
     } else {
-      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
-    }

Review Comment:
   We can drop the `aggregated` flag.
   
   In other words, we can make this simply:
   
   ```
   val resultIter = {
     if (dep.keyOrdering.isDefined) {
       // whatever is in dep.keyOrdering == match `Some(keyOrd: Ordering[K])` clause.
     } else if (dep.aggregator.isDefined) {
   
       // whatever is in "if (!aggregated && dep.aggregator.isDefined) {"
   
     } else {
       interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
     }
   }
   ```
   
   The proposed code is effectively this - but spread across two blocks tied through `aggregated` flag.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "zhengchenyu (via GitHub)" <gi...@apache.org>.
zhengchenyu commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1963155578

   @waitinfuture 
   (1) performance differs
   Experimental data were recorded in https://issues.apache.org/jira/browse/SPARK-46512. How much performance improvement depends on the experimental environment. In my experiments, the correlation time was reduced from 75 seconds to 29 seconds.
   
   (2) total number of spilling differs
   The reduction in the number of spills is obvious. 
   Before this PR, we combine the unsorted records, then sort. When we combine the unsorted records, we us ExternalAppendOnlyMap. They may spill for large data. Then when we sort, we still spill for large data.
   After this PR, when we sort, we can easily organize the same keys together, and then we no longer have to use ExternalAppendOnlyMap to combine.
   After this PR, we will save the spilled process which we must do in combine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1876634134

   Thanks for looking into this @Ngone51 !
   Will take a look over this PR later this week/next - sorry for the delay, was away on a vacation.
   +CC @otterc as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1962905759

   > > > but combine in ExternalSorter will never trigger extra spill.
   > > 
   > > 
   > > It could be triggered by `ExternalSorter#maybeSpillCollection`?
   > 
   > @Ngone51 Thanks for your reply!
   > 
   > Yes, It could be triggered by `ExternalSorter#maybeSpillCollection`. I mean that the Extra spill is trigger by `ExternalAppendOnlyMap#spill`
   > 
   > Before the change, both`ExternalAppendOnlyMap#spill` and `ExternalSorter#maybeSpillCollection` will be spill. After the change, only `ExternalSorter#maybeSpillCollection` will be spill.
   > 
   > I created a huge amount of data, and I found the following logs:
   > 
   > ```
   > 23/12/26 16:55:52 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 522.3 MiB to disk (1 time so far)
   > 23/12/26 16:56:09 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 629.8 MiB to disk (2 times so far)
   > 23/12/26 16:56:40 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 491.5 MiB to disk (3 times so far)
   > 23/12/26 16:56:58 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 553.0 MiB to disk (4 times so far)
   > 23/12/26 16:57:16 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 442.9 MiB to disk (5 times so far)
   > 23/12/26 16:57:47 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 576.1 MiB to disk (6 times so far)
   > 23/12/26 16:58:04 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 540.2 MiB to disk (7 times so far)
   > 23/12/26 16:58:22 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 686.3 MiB to disk (8 times so far)
   > 23/12/26 16:58:52 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 540.2 MiB to disk (9 times so far)
   > 23/12/26 17:00:06 INFO ExternalAppendOnlyMap: Task 5 force spilling in-memory map to disk and it will release 942.2 MiB memory
   > 23/12/26 17:00:27 INFO ExternalSorter: Thread 48 spilling in-memory map of 954.0 MiB to disk (1 time so far)
   > 23/12/26 17:01:24 INFO ExternalSorter: Thread 48 spilling in-memory map of 954.0 MiB to disk (2 times so far)
   > 23/12/26 17:02:24 INFO ExternalSorter: Thread 48 spilling in-memory map of 959.1 MiB to disk (3 times so far)
   > 23/12/26 17:03:23 INFO ExternalSorter: Thread 48 spilling in-memory map of 954.0 MiB to disk (4 times so far)
   > 23/12/26 17:04:30 INFO ExternalSorter: Thread 48 spilling in-memory map of 948.8 MiB to disk (5 times so far)
   > 23/12/26 17:05:24 INFO ExternalSorter: Thread 48 spilling in-memory map of 943.7 MiB to disk (6 times so far)
   > 23/12/26 17:06:25 INFO ExternalSorter: Thread 48 spilling in-memory map of 954.0 MiB to disk (7 times so far)
   > 23/12/26 17:07:08 INFO ExternalSorter: Thread 48 spilling in-memory map of 959.1 MiB to disk (8 times so far)
   > 23/12/26 17:07:58 INFO ExternalSorter: Thread 48 spilling in-memory map of 959.1 MiB to disk (9 times so far) 
   > ```
   > 
   > After the change, we will only found ExternalSorter spill log.
   
   Hi @zhengchenyu , curious is the total number of spilling differ before and after this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "zhengchenyu (via GitHub)" <gi...@apache.org>.
zhengchenyu commented on code in PR #44512:
URL: https://github.com/apache/spark/pull/44512#discussion_r1461297075


##########
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala:
##########
@@ -111,31 +111,50 @@ private[spark] class BlockStoreShuffleReader[K, C](
     // An interruptible iterator must be used here in order to support task cancellation
     val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
 
-    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
+    // Sort the output if there is a sort ordering defined.
+    var aggregated = false
+    // The type of the value cannot be determined here, maybe the type of value
+    // or the type of combined value.
+    val sortedIter: Iterator[Product2[K, Nothing]] = dep.keyOrdering match {
+      case Some(keyOrd: Ordering[K]) =>
+        // Create an ExternalSorter to sort the data.
+        val sorter: ExternalSorter[K, _, C] = if (dep.aggregator.isDefined) {
+          aggregated = true
+          if (dep.mapSideCombine) {
+            new ExternalSorter[K, C, C](context,
+              Option(new Aggregator[K, C, C](identity,
+                dep.aggregator.get.mergeCombiners,
+                dep.aggregator.get.mergeCombiners)),
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          } else {
+            new ExternalSorter[K, Nothing, C](context,
+              dep.aggregator.asInstanceOf[Option[Aggregator[K, Nothing, C]]],
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          }
+        } else {
+          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
+            serializer = dep.serializer)
+        }
+        sorter.insertAllAndUpdateMetrics(interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]).
+          asInstanceOf[Iterator[(K, Nothing)]]
+      case None =>
+        interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+    }
+
+    val resultIter: Iterator[Product2[K, C]] = if (!aggregated && dep.aggregator.isDefined) {
       if (dep.mapSideCombine) {
         // We are reading values that are already combined
-        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
+        val combinedKeyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, C)]]
         dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
       } else {
         // We don't know the value type, but also don't care -- the dependency *should*
         // have made sure its compatible w/ this aggregator, which will convert the value
         // type to the combined type C
-        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+        val keyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, Nothing)]]
         dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
       }
     } else {
-      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
-    }

Review Comment:
   Thanks for your review! This does make the code cleaner indeed! I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1874771642

   > but combine in ExternalSorter will never trigger extra spill.
   
   It could be triggered by `ExternalSorter#maybeSpillCollection`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1874778788

   It looks like `ExternalAppendOnlyMap` uses the key for combine:
   ```
   dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
   ```
   
    but `ExternalSorter` uses partition id:
   
   ``` 
   map.changeValue((actualPartitioner.getPartition(kv._1), kv._1), update)
   ```
   
   Do you think this would lead to the different result?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "waitinfuture (via GitHub)" <gi...@apache.org>.
waitinfuture commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1963172789

   > @waitinfuture (1) performance differs Experimental data were recorded in https://issues.apache.org/jira/browse/SPARK-46512. How much performance improvement depends on the experimental environment. In my experiments, the correlation time was reduced from 75 seconds to 29 seconds.
   > 
   > (2) total number of spilling differs The reduction in the number of spills is obvious. Before this PR, we combine the unsorted records, then sort. When we combine the unsorted records, we us ExternalAppendOnlyMap. They may spill for large data. Then when we sort, we still spill for large data. After this PR, when we sort, we can easily organize the same keys together, and then we no longer have to use ExternalAppendOnlyMap to combine. After this PR, we will save the spilled process which we must do in combine.
   
   Got it, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1875431664

   I see @zhengchenyu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "zhengchenyu (via GitHub)" <gi...@apache.org>.
zhengchenyu commented on PR #44512:
URL: https://github.com/apache/spark/pull/44512#issuecomment-1874780989

   > > but combine in ExternalSorter will never trigger extra spill.
   > 
   > It could be triggered by `ExternalSorter#maybeSpillCollection`?
   
   @Ngone51 Thanks for your reply!
   
   Yes, It could be triggered by `ExternalSorter#maybeSpillCollection`. I mean that the Extra spill is trigger by `ExternalAppendOnlyMap#spill`
   
   Before the change, both`ExternalAppendOnlyMap#spill` and `ExternalSorter#maybeSpillCollection` will be spill. After the change, only `ExternalSorter#maybeSpillCollection` will be spill.
   
   I created a huge amount of data, and I found the following logs:
   
   ```
   23/12/26 16:55:52 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 522.3 MiB to disk (1 time so far)
   23/12/26 16:56:09 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 629.8 MiB to disk (2 times so far)
   23/12/26 16:56:40 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 491.5 MiB to disk (3 times so far)
   23/12/26 16:56:58 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 553.0 MiB to disk (4 times so far)
   23/12/26 16:57:16 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 442.9 MiB to disk (5 times so far)
   23/12/26 16:57:47 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 576.1 MiB to disk (6 times so far)
   23/12/26 16:58:04 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 540.2 MiB to disk (7 times so far)
   23/12/26 16:58:22 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 686.3 MiB to disk (8 times so far)
   23/12/26 16:58:52 INFO ExternalAppendOnlyMap: Thread 48 spilling in-memory map of 540.2 MiB to disk (9 times so far)
   23/12/26 17:00:06 INFO ExternalAppendOnlyMap: Task 5 force spilling in-memory map to disk and it will release 942.2 MiB memory
   23/12/26 17:00:27 INFO ExternalSorter: Thread 48 spilling in-memory map of 954.0 MiB to disk (1 time so far)
   23/12/26 17:01:24 INFO ExternalSorter: Thread 48 spilling in-memory map of 954.0 MiB to disk (2 times so far)
   23/12/26 17:02:24 INFO ExternalSorter: Thread 48 spilling in-memory map of 959.1 MiB to disk (3 times so far)
   23/12/26 17:03:23 INFO ExternalSorter: Thread 48 spilling in-memory map of 954.0 MiB to disk (4 times so far)
   23/12/26 17:04:30 INFO ExternalSorter: Thread 48 spilling in-memory map of 948.8 MiB to disk (5 times so far)
   23/12/26 17:05:24 INFO ExternalSorter: Thread 48 spilling in-memory map of 943.7 MiB to disk (6 times so far)
   23/12/26 17:06:25 INFO ExternalSorter: Thread 48 spilling in-memory map of 954.0 MiB to disk (7 times so far)
   23/12/26 17:07:08 INFO ExternalSorter: Thread 48 spilling in-memory map of 959.1 MiB to disk (8 times so far)
   23/12/26 17:07:58 INFO ExternalSorter: Thread 48 spilling in-memory map of 959.1 MiB to disk (9 times so far) 
   ```
   
   After the change, we will only found ExternalSorter spill log.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46512][CORE] Optimize shuffle reading when both sort and combine are used. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44512:
URL: https://github.com/apache/spark/pull/44512#discussion_r1460207780


##########
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala:
##########
@@ -111,31 +111,50 @@ private[spark] class BlockStoreShuffleReader[K, C](
     // An interruptible iterator must be used here in order to support task cancellation
     val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
 
-    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
+    // Sort the output if there is a sort ordering defined.
+    var aggregated = false
+    // The type of the value cannot be determined here, maybe the type of value
+    // or the type of combined value.
+    val sortedIter: Iterator[Product2[K, Nothing]] = dep.keyOrdering match {
+      case Some(keyOrd: Ordering[K]) =>
+        // Create an ExternalSorter to sort the data.
+        val sorter: ExternalSorter[K, _, C] = if (dep.aggregator.isDefined) {
+          aggregated = true
+          if (dep.mapSideCombine) {
+            new ExternalSorter[K, C, C](context,
+              Option(new Aggregator[K, C, C](identity,
+                dep.aggregator.get.mergeCombiners,
+                dep.aggregator.get.mergeCombiners)),
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          } else {
+            new ExternalSorter[K, Nothing, C](context,
+              dep.aggregator.asInstanceOf[Option[Aggregator[K, Nothing, C]]],
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          }
+        } else {
+          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
+            serializer = dep.serializer)
+        }
+        sorter.insertAllAndUpdateMetrics(interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]).
+          asInstanceOf[Iterator[(K, Nothing)]]
+      case None =>
+        interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+    }
+
+    val resultIter: Iterator[Product2[K, C]] = if (!aggregated && dep.aggregator.isDefined) {
       if (dep.mapSideCombine) {
         // We are reading values that are already combined
-        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
+        val combinedKeyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, C)]]
         dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
       } else {
         // We don't know the value type, but also don't care -- the dependency *should*
         // have made sure its compatible w/ this aggregator, which will convert the value
         // type to the combined type C
-        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+        val keyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, Nothing)]]
         dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
       }
     } else {
-      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
-    }

Review Comment:
   We can drop the `aggregated` flag.
   
   In other words, we can make this simply:
   
   ```
   val resultIter = {
     if (dep.keyOrdering.isDefined) {
       // whatever is in dep.keyOrdering == match "Some(keyOrd: Ordering[K])" clause.
     } else if (dep.aggregator.isDefined) {
   
       // whatever is in "if (!aggregated && dep.aggregator.isDefined) {"
   
     } else {
       interruptibleIter
     }
   }
   ```
   
   The proposed code is effectively this - but spread across two blocks tied through `aggregated` flag.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org