You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/10/03 13:57:27 UTC

[jira] [Resolved] (SPARK-10476) Enable common RDD operations on standard Scala collections

     [ https://issues.apache.org/jira/browse/SPARK-10476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-10476.
-------------------------------
    Resolution: Won't Fix

> Enable common RDD operations on standard Scala collections
> ----------------------------------------------------------
>
>                 Key: SPARK-10476
>                 URL: https://issues.apache.org/jira/browse/SPARK-10476
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 1.4.1
>            Reporter: Simeon Simeonov
>            Priority: Minor
>              Labels: core, mapPartitions, rdd
>
> A common pattern in Spark development is to look for opportunities to leverage data locality using mechanisms such as {{mapPartitions}}. Often this happens when an existing set of RDD transformations is refactored to improve performance. At that point, significant code refactoring may be required because the input is {{Iterator\[T]}} as opposed to an RDD. The most common examples we've encountered so far involve the {{*ByKey}} methods, {{sample}} and {{takeSample}}. We have also observed cases where, due to changes in the structure of data use of {{mapPartitions}} is no longer possible and the code has to be converted to use the RDD API.
> If data manipulation through the RDD API could be applied to the standard Scala data structures then refactoring Spark data pipelines would become faster and less bug-prone. Also, and this is no small benefit, the thoughtfulness and experience of the Spark community could spread to the broader Scala community.
> There are multiple approaches to solving this problem, including but not limited to creating a set of {{Local*RDD}} classes and/or adding implicit conversions.
> Here is a simple example meant to be short as opposed to complete or performance-optimized:
> {code}
> implicit class LocalRDD[T](it: Iterator[T]) extends Iterable[T] {
>   def this(collection: Iterable[T]) = this(collection.toIterator)
>   def iterator = it
> }
> implicit class LocalPairRDD[K, V](it: Iterator[(K, V)]) extends Iterable[(K, V)] {
>   def this(collection: Iterable[(K, V)]) = this(collection.toIterator)
>   def iterator = it
>   def groupByKey() = new LocalPairRDD[K, Iterable[V]](
>     groupBy(_._1).map { case (k, valuePairs) => (k, valuePairs.map(_._2)) }
>   )
> }
> sc.
>   parallelize(Array((1, 10), (2, 10), (1, 20))).
>   repartition(1).
>   mapPartitions(data => data.groupByKey().toIterator).
>   take(2)
> // Array[(Int, Iterable[Int])] = Array((2,List(10)), (1,List(10, 20)))
> {code} 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org