You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2020/07/07 19:44:00 UTC

[jira] [Commented] (SPARK-32212) RDD.takeOrdered can choose to merge intermediate results in executor or driver

    [ https://issues.apache.org/jira/browse/SPARK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153059#comment-17153059 ] 

Apache Spark commented on SPARK-32212:
--------------------------------------

User 'izchen' has created a pull request for this issue:
https://github.com/apache/spark/pull/29028

> RDD.takeOrdered can choose to merge intermediate results in executor or driver
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-32212
>                 URL: https://issues.apache.org/jira/browse/SPARK-32212
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Chen Zhang
>            Priority: Major
>
> In the list of issues, I saw some discussions about exceeding the memory limit of the driver when using _RDD.takeOrdered_ or _SQL(order by xx limit xx)_. I think that the implementation of _RDD.takeOrdered_ can be improved.
> In the original code implementation of _RDD.takeOrdered_, the QuickSelect algorithm in guava is used in the executor process to calculate the local TopK results of each RDD partition. These intermediate results are packaged into java.util.PriorityQueue and returned to the driver process. In the driver process, these intermediate results are merged to get the global TopK results.
> The problem with this implementation is that if the intermediate results are too large and too many partitions, the intermediate results may accumulate in the memory of the driver process, causing excessive memory pressure.
> We can use an optional config to determine whether the intermediate results(local TopK) of each partition in _RDD.takeOrdered_ will be merged in driver process or executor process. If set to true, merge in driver process(by util.PriorityQueue), which will get shorter waiting time for return. But if the intermediate results are too large and too many partitions, the intermediate results may accumulate in the memory of the driver process, causing excessive memory pressure. If set to false, merge in executor process(by guava.QuickSelect), intermediate results will not accumulate in memory, but will cause longer runtimes.
> something like:
> _(org.apache.spark.rdd.RDD)_
> {code:scala}
>   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
>     if (num == 0 || partitions.length == 0) {
>       Array.empty
>     } else {
>       if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
>         val mapRDDs = mapPartitions { items =>
>           // Priority keeps the largest elements, so let's reverse the ordering.
>           val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
>           queue ++= collectionUtils.takeOrdered(items, num)(ord)
>           Iterator.single(queue)
>         }
>         mapRDDs.reduce { (queue1, queue2) =>
>           queue1 ++= queue2
>           queue1
>         }.toArray.sorted(ord)
>       } else {
>         mapPartitions { items =>
>           collectionUtils.takeOrdered(items, num)(ord)
>         }.repartition(1).mapPartitions { items =>
>           collectionUtils.takeOrdered(items, num)(ord)
>         }.collect()
>       }
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org