You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Simeon Simeonov (JIRA)" <ji...@apache.org> on 2015/09/07 17:57:46 UTC
[jira] [Updated] (SPARK-10476) Enable common RDD operations on
standard Scala collections
[ https://issues.apache.org/jira/browse/SPARK-10476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Simeon Simeonov updated SPARK-10476:
------------------------------------
Summary: Enable common RDD operations on standard Scala collections (was: Add common RDD operations on standard Scala collections)
> Enable common RDD operations on standard Scala collections
> ----------------------------------------------------------
>
> Key: SPARK-10476
> URL: https://issues.apache.org/jira/browse/SPARK-10476
> Project: Spark
> Issue Type: New Feature
> Components: Spark Core
> Affects Versions: 1.4.1
> Reporter: Simeon Simeonov
> Priority: Minor
> Labels: core, mapPartitions, rdd
>
> A common pattern in Spark development is to look for opportunities to leverage data locality using mechanisms such as {{mapPartitions}}. Often this happens when an existing set of RDD transformations is refactored to improve performance. At that point, significant code refactoring may be required because the input is {{Iterator\[T]}} as opposed to an RDD. The most common examples we've encountered so far involve the {{*ByKey}} methods, {{sample}} and {{takeSample}}. We have also observed cases where, due to changes in the structure of data use of {{mapPartitions}} is no longer possible and the code has to be converted to use the RDD API.
> If data manipulation through the RDD API could be applied to the standard Scala data structures then refactoring Spark data pipelines would become faster and less bug-prone. Also, and this is no small benefit, the thoughtfulness and experience of the Spark community could spread to the broader Scala community.
> There are multiple approaches to solving this problem, including but not limited to creating a set of {{Local*RDD}} classes and/or adding implicit conversions.
> Here is a simple example meant to be short as opposed to complete or performance-optimized:
> {code}
> implicit class LocalRDD[T](it: Iterator[T]) extends Iterable[T] {
> def this(collection: Iterable[T]) = this(collection.toIterator)
> def iterator = it
> }
> implicit class LocalPairRDD[K, V](it: Iterator[(K, V)]) extends Iterable[(K, V)] {
> def this(collection: Iterable[(K, V)]) = this(collection.toIterator)
> def iterator = it
> def groupByKey() = new LocalPairRDD[K, Iterable[V]](
> groupBy(_._1).map { case (k, valuePairs) => (k, valuePairs.map(_._2)) }
> )
> }
> sc.
> parallelize(Array((1, 10), (2, 10), (1, 20))).
> repartition(1).
> mapPartitions(data => data.groupByKey().toIterator).
> take(2)
> // Array[(Int, Iterable[Int])] = Array((2,List(10)), (1,List(10, 20)))
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org