You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/17 06:23:40 UTC

[3/4] git commit: Spark-1163, Added missing Python RDD functions

Spark-1163, Added missing Python RDD functions

Author: prabinb <pr...@imaginea.com>

Closes #92 from prabinb/python-api-rdd and squashes the following commits:

51129ca [prabinb] Added missing Python RDD functions Added __repr__ function to StorageLevel class. Added doctest for RDD.getStorageLevel().

Conflicts:

	python/pyspark/rdd.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/249930ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/249930ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/249930ae

Branch: refs/heads/branch-0.9
Commit: 249930aeaca6cfa7592e77fc0472069c1a63fdb6
Parents: 4480505
Author: prabinb <pr...@imaginea.com>
Authored: Tue Mar 11 23:57:05 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:14:53 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py          | 90 +++++++++++++++++++++++++++++++++++++
 python/pyspark/storagelevel.py |  4 ++
 2 files changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/249930ae/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 487bfb1..0fdb9e6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,6 +36,7 @@ from pyspark.join import python_join, python_left_outer_join, \
     python_right_outer_join, python_cogroup
 from pyspark.statcounter import StatCounter
 from pyspark.rddsampler import RDDSampler
+from pyspark.storagelevel import StorageLevel
 
 from py4j.java_collections import ListConverter, MapConverter
 
@@ -1026,6 +1027,95 @@ class RDD(object):
         """
         return self.map(lambda x: (f(x), x))
 
+    def repartition(self, numPartitions):
+        """
+         Return a new RDD that has exactly numPartitions partitions.
+          
+         Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+         a shuffle to redistribute data.
+         If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+         which can avoid performing a shuffle.
+         >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
+         >>> sorted(rdd.glom().collect())
+         [[1], [2, 3], [4, 5], [6, 7]]
+         >>> len(rdd.repartition(2).glom().collect())
+         2
+         >>> len(rdd.repartition(10).glom().collect())
+         10
+        """
+        jrdd = self._jrdd.repartition(numPartitions)
+        return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+    def coalesce(self, numPartitions, shuffle=False):
+        """
+        Return a new RDD that is reduced into `numPartitions` partitions.
+        >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
+        [[1], [2, 3], [4, 5]]
+        >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
+        [[1, 2, 3, 4, 5]]
+        """
+        jrdd = self._jrdd.coalesce(numPartitions)
+        return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+    def zip(self, other):
+        """
+        Zips this RDD with another one, returning key-value pairs with the first element in each RDD
+        second element in each RDD, etc. Assumes that the two RDDs have the same number of
+        partitions and the same number of elements in each partition (e.g. one was made through
+        a map on the other).
+
+        >>> x = sc.parallelize(range(0,5))
+        >>> y = sc.parallelize(range(1000, 1005))
+        >>> x.zip(y).collect()
+        [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
+        """
+        pairRDD = self._jrdd.zip(other._jrdd)
+        deserializer = PairDeserializer(self._jrdd_deserializer,
+                                             other._jrdd_deserializer)
+        return RDD(pairRDD, self.ctx, deserializer)
+
+    def name(self):
+        """
+        Return the name of this RDD.
+        """
+        name_ = self._jrdd.name()
+        if not name_:
+            return None
+        return name_.encode('utf-8')
+
+    def setName(self, name):
+        """
+        Assign a name to this RDD.
+        >>> rdd1 = sc.parallelize([1,2])
+        >>> rdd1.setName('RDD1')
+        >>> rdd1.name()
+        'RDD1'
+        """
+        self._jrdd.setName(name)
+
+    def toDebugString(self):
+        """
+        A description of this RDD and its recursive dependencies for debugging.
+        """
+        debug_string = self._jrdd.toDebugString()
+        if not debug_string:
+            return None
+        return debug_string.encode('utf-8')
+
+    def getStorageLevel(self):
+        """
+        Get the RDD's current storage level.
+        >>> rdd1 = sc.parallelize([1,2])
+        >>> rdd1.getStorageLevel()
+        StorageLevel(False, False, False, 1)
+        """
+        java_storage_level = self._jrdd.getStorageLevel()
+        storage_level = StorageLevel(java_storage_level.useDisk(),
+                                     java_storage_level.useMemory(),
+                                     java_storage_level.deserialized(),
+                                     java_storage_level.replication())
+        return storage_level
+
     # TODO: `lookup` is disabled because we can't make direct comparisons based
     # on the key; we need to compare the hash of the key to the hash of the
     # keys in the pairs.  This could be an expensive operation, since those

http://git-wip-us.apache.org/repos/asf/spark/blob/249930ae/python/pyspark/storagelevel.py
----------------------------------------------------------------------
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index b31f476..c3e3a44 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -31,6 +31,10 @@ class StorageLevel:
         self.deserialized = deserialized
         self.replication = replication
 
+    def __repr__(self):
+        return "StorageLevel(%s, %s, %s, %s)" % (
+            self.useDisk, self.useMemory, self.deserialized, self.replication)
+
 StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
 StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
 StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)