You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/17 06:23:40 UTC
[3/4] git commit: Spark-1163, Added missing Python RDD functions
Spark-1163, Added missing Python RDD functions
Author: prabinb <pr...@imaginea.com>
Closes #92 from prabinb/python-api-rdd and squashes the following commits:
51129ca [prabinb] Added missing Python RDD functions Added __repr__ function to StorageLevel class. Added doctest for RDD.getStorageLevel().
Conflicts:
python/pyspark/rdd.py
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/249930ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/249930ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/249930ae
Branch: refs/heads/branch-0.9
Commit: 249930aeaca6cfa7592e77fc0472069c1a63fdb6
Parents: 4480505
Author: prabinb <pr...@imaginea.com>
Authored: Tue Mar 11 23:57:05 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:14:53 2014 -0700
----------------------------------------------------------------------
python/pyspark/rdd.py | 90 +++++++++++++++++++++++++++++++++++++
python/pyspark/storagelevel.py | 4 ++
2 files changed, 94 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/249930ae/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 487bfb1..0fdb9e6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,6 +36,7 @@ from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler
+from pyspark.storagelevel import StorageLevel
from py4j.java_collections import ListConverter, MapConverter
@@ -1026,6 +1027,95 @@ class RDD(object):
"""
return self.map(lambda x: (f(x), x))
+ def repartition(self, numPartitions):
+ """
+ Return a new RDD that has exactly numPartitions partitions.
+
+ Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+ a shuffle to redistribute data.
+ If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+ which can avoid performing a shuffle.
+ >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
+ >>> sorted(rdd.glom().collect())
+ [[1], [2, 3], [4, 5], [6, 7]]
+ >>> len(rdd.repartition(2).glom().collect())
+ 2
+ >>> len(rdd.repartition(10).glom().collect())
+ 10
+ """
+ jrdd = self._jrdd.repartition(numPartitions)
+ return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+ def coalesce(self, numPartitions, shuffle=False):
+ """
+ Return a new RDD that is reduced into `numPartitions` partitions.
+ >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
+ [[1], [2, 3], [4, 5]]
+ >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
+ [[1, 2, 3, 4, 5]]
+ """
+ jrdd = self._jrdd.coalesce(numPartitions)
+ return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+ def zip(self, other):
+ """
+ Zips this RDD with another one, returning key-value pairs with the first element in each RDD
+ second element in each RDD, etc. Assumes that the two RDDs have the same number of
+ partitions and the same number of elements in each partition (e.g. one was made through
+ a map on the other).
+
+ >>> x = sc.parallelize(range(0,5))
+ >>> y = sc.parallelize(range(1000, 1005))
+ >>> x.zip(y).collect()
+ [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
+ """
+ pairRDD = self._jrdd.zip(other._jrdd)
+ deserializer = PairDeserializer(self._jrdd_deserializer,
+ other._jrdd_deserializer)
+ return RDD(pairRDD, self.ctx, deserializer)
+
+ def name(self):
+ """
+ Return the name of this RDD.
+ """
+ name_ = self._jrdd.name()
+ if not name_:
+ return None
+ return name_.encode('utf-8')
+
+ def setName(self, name):
+ """
+ Assign a name to this RDD.
+ >>> rdd1 = sc.parallelize([1,2])
+ >>> rdd1.setName('RDD1')
+ >>> rdd1.name()
+ 'RDD1'
+ """
+ self._jrdd.setName(name)
+
+ def toDebugString(self):
+ """
+ A description of this RDD and its recursive dependencies for debugging.
+ """
+ debug_string = self._jrdd.toDebugString()
+ if not debug_string:
+ return None
+ return debug_string.encode('utf-8')
+
+ def getStorageLevel(self):
+ """
+ Get the RDD's current storage level.
+ >>> rdd1 = sc.parallelize([1,2])
+ >>> rdd1.getStorageLevel()
+ StorageLevel(False, False, False, 1)
+ """
+ java_storage_level = self._jrdd.getStorageLevel()
+ storage_level = StorageLevel(java_storage_level.useDisk(),
+ java_storage_level.useMemory(),
+ java_storage_level.deserialized(),
+ java_storage_level.replication())
+ return storage_level
+
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
http://git-wip-us.apache.org/repos/asf/spark/blob/249930ae/python/pyspark/storagelevel.py
----------------------------------------------------------------------
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index b31f476..c3e3a44 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -31,6 +31,10 @@ class StorageLevel:
self.deserialized = deserialized
self.replication = replication
+ def __repr__(self):
+ return "StorageLevel(%s, %s, %s, %s)" % (
+ self.useDisk, self.useMemory, self.deserialized, self.replication)
+
StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)