You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Madhu Siddalingaiah (JIRA)" <ji...@apache.org> on 2014/06/01 19:39:01 UTC

[jira] [Commented] (SPARK-983) Support external sorting for RDD#sortByKey()

    [ https://issues.apache.org/jira/browse/SPARK-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14015020#comment-14015020 ] 

Madhu Siddalingaiah commented on SPARK-983:
-------------------------------------------

I tested some additions locally that seem to work well so far. I created a SortedPartitionsRDD and a sortPartitions(...) method in [RDD|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala]:

{code}
  /**
   * Return a new RDD containing sorted partitions in this RDD.
   */
  def sortPartitions(lt: (T, T) => Boolean): RDD[T] = new SortedPartitionsRDD(this, sc.clean(lt))
{code}

I haven't added the spill/merge code to SortedPartitionsRDD yet. I wanted to get some buy in on this method as it's an addition to the API. It fits nicely with [OrderedRDDFunctions|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala] and passes all tests in [SortingSuite|https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala].

I think this method can be used to address [SPARK-1021|https://issues.apache.org/jira/browse/SPARK-1021] as well as many use cases outside of sortByKey(). Does everyone agree? If so, I'll move forward with external sort in SortedPartitionsRDD and necessary tests.

> Support external sorting for RDD#sortByKey()
> --------------------------------------------
>
>                 Key: SPARK-983
>                 URL: https://issues.apache.org/jira/browse/SPARK-983
>             Project: Spark
>          Issue Type: New Feature
>    Affects Versions: 0.9.0
>            Reporter: Reynold Xin
>            Assignee: Madhu Siddalingaiah
>
> Currently, RDD#sortByKey() is implemented by a mapPartitions which creates a buffer to hold the entire partition, then sorts it. This will cause an OOM if an entire partition cannot fit in memory, which is especially problematic for skewed data. Rather than OOMing, the behavior should be similar to the [ExternalAppendOnlyMap|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala], where we fallback to disk if we detect memory pressure.



--
This message was sent by Atlassian JIRA
(v6.2#6252)