You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/01/28 21:47:36 UTC

spark git commit: [SPARK-5440][pyspark] Add toLocalIterator to pyspark rdd

Repository: spark
Updated Branches:
  refs/heads/master 9b18009b8 -> 456c11f15


[SPARK-5440][pyspark] Add toLocalIterator to pyspark rdd

Since Java and Scala both have access to iterate over partitions via the "toLocalIterator" function, python should also have that same ability.

Author: Michael Nazario <mn...@palantir.com>

Closes #4237 from mnazario/feature/toLocalIterator and squashes the following commits:

1c58526 [Michael Nazario] Fix documentation off by one error
0cdc8f8 [Michael Nazario] Add toLocalIterator to PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/456c11f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/456c11f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/456c11f1

Branch: refs/heads/master
Commit: 456c11f15aec809044d8bdbdcce0ae05533fb44b
Parents: 9b18009
Author: Michael Nazario <mn...@palantir.com>
Authored: Wed Jan 28 12:47:12 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Wed Jan 28 12:47:12 2015 -0800

----------------------------------------------------------------------
 python/pyspark/rdd.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/456c11f1/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index efd2f35..014c0aa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2059,6 +2059,20 @@ class RDD(object):
         hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF)
         return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD)
 
+    def toLocalIterator(self):
+        """
+        Return an iterator that contains all of the elements in this RDD.
+        The iterator will consume as much memory as the largest partition in this RDD.
+        >>> rdd = sc.parallelize(range(10))
+        >>> [x for x in rdd.toLocalIterator()]
+        [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
+        """
+        partitions = xrange(self.getNumPartitions())
+        for partition in partitions:
+            rows = self.context.runJob(self, lambda x: x, [partition])
+            for row in rows:
+                yield row
+
 
 class PipelinedRDD(RDD):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org