You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:44:16 UTC

[jira] [Resolved] (SPARK-24587) RDD.takeOrdered uses reduce, pulling all partition data to the driver

     [ https://issues.apache.org/jira/browse/SPARK-24587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-24587.
----------------------------------
    Resolution: Incomplete

> RDD.takeOrdered uses reduce, pulling all partition data to the driver
> ---------------------------------------------------------------------
>
>                 Key: SPARK-24587
>                 URL: https://issues.apache.org/jira/browse/SPARK-24587
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: Ryan Deak
>            Priority: Major
>              Labels: bulk-closed
>
> *NOTE*: _This is likely a *very* impactful change, and likely only matters when {{num}} is large, but without something like the proposed change, algorithms based on distributed {{top-K}} don't scale very well._
> h2. Description
> {{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}} uses {{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}} to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} is the size of the returned {{Array}}.  Consequently, even when the size of the return value is small, relative to the driver memory, errors can occur.
> An example error is:
> {code}
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 28 tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 29 tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> ...
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 160 tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> {code}
> It's clear from this message that although the resulting size of the result will be approximately *0.3 GB*  ({{46.4/160}}), the amount of driver memory required to combine the results is more than {{46 GB}}.
> h2. Proposed Solution
> This amount of memory required can be dramatically reduced by using {{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}.  For instance replacing the {{else}} clause with:
> {code:language=scala}
> else {
>   import scala.math.{ceil, log, max}
>   val depth = max(1, ceil(log(mapRDDs.partitions.length) / log(2)).toInt)
>   mapRDDs.treeReduce(
>     (queue1, queue2) => queue1 ++= queue2,
>     depth
>   ).toArray.sorted(ord)
> }
> {code}
> This should require less than double the network communication but should scale to much larger values of the {{num}} parameter without configuration changes or beefier machines.
> h2. Code Potentially Impacted
> * ML Lib's {{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org