You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/17 06:23:38 UTC

[1/4] git commit: Updated link for pyspark examples in docs

Repository: spark
Updated Branches:
  refs/heads/branch-0.9 ef74e44e0 -> 1dc1e988f


Updated link for pyspark examples in docs

Author: Jyotiska NK <jy...@gmail.com>

Closes #22 from jyotiska/pyspark_docs and squashes the following commits:

426136c [Jyotiska NK] Updated link for pyspark examples


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e74e79a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e74e79a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e74e79a0

Branch: refs/heads/branch-0.9
Commit: e74e79a06fc17a4ea71c882de2549bbd4dbf76c5
Parents: ef74e44
Author: Jyotiska NK <jy...@gmail.com>
Authored: Wed Feb 26 21:37:04 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:12:51 2014 -0700

----------------------------------------------------------------------
 docs/python-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e74e79a0/docs/python-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 7c5283f..57ed54c 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -157,7 +157,7 @@ some example applications.
 
 # Where to Go from Here
 
-PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
+PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples).
 You can run them by passing the files to `pyspark`; e.g.:
 
     ./bin/pyspark python/examples/wordcount.py


[3/4] git commit: Spark-1163, Added missing Python RDD functions

Posted by pw...@apache.org.
Spark-1163, Added missing Python RDD functions

Author: prabinb <pr...@imaginea.com>

Closes #92 from prabinb/python-api-rdd and squashes the following commits:

51129ca [prabinb] Added missing Python RDD functions Added __repr__ function to StorageLevel class. Added doctest for RDD.getStorageLevel().

Conflicts:

	python/pyspark/rdd.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/249930ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/249930ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/249930ae

Branch: refs/heads/branch-0.9
Commit: 249930aeaca6cfa7592e77fc0472069c1a63fdb6
Parents: 4480505
Author: prabinb <pr...@imaginea.com>
Authored: Tue Mar 11 23:57:05 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:14:53 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py          | 90 +++++++++++++++++++++++++++++++++++++
 python/pyspark/storagelevel.py |  4 ++
 2 files changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/249930ae/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 487bfb1..0fdb9e6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,6 +36,7 @@ from pyspark.join import python_join, python_left_outer_join, \
     python_right_outer_join, python_cogroup
 from pyspark.statcounter import StatCounter
 from pyspark.rddsampler import RDDSampler
+from pyspark.storagelevel import StorageLevel
 
 from py4j.java_collections import ListConverter, MapConverter
 
@@ -1026,6 +1027,95 @@ class RDD(object):
         """
         return self.map(lambda x: (f(x), x))
 
+    def repartition(self, numPartitions):
+        """
+         Return a new RDD that has exactly numPartitions partitions.
+          
+         Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+         a shuffle to redistribute data.
+         If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+         which can avoid performing a shuffle.
+         >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
+         >>> sorted(rdd.glom().collect())
+         [[1], [2, 3], [4, 5], [6, 7]]
+         >>> len(rdd.repartition(2).glom().collect())
+         2
+         >>> len(rdd.repartition(10).glom().collect())
+         10
+        """
+        jrdd = self._jrdd.repartition(numPartitions)
+        return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+    def coalesce(self, numPartitions, shuffle=False):
+        """
+        Return a new RDD that is reduced into `numPartitions` partitions.
+        >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
+        [[1], [2, 3], [4, 5]]
+        >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
+        [[1, 2, 3, 4, 5]]
+        """
+        jrdd = self._jrdd.coalesce(numPartitions)
+        return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+    def zip(self, other):
+        """
+        Zips this RDD with another one, returning key-value pairs with the first element in each RDD
+        second element in each RDD, etc. Assumes that the two RDDs have the same number of
+        partitions and the same number of elements in each partition (e.g. one was made through
+        a map on the other).
+
+        >>> x = sc.parallelize(range(0,5))
+        >>> y = sc.parallelize(range(1000, 1005))
+        >>> x.zip(y).collect()
+        [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
+        """
+        pairRDD = self._jrdd.zip(other._jrdd)
+        deserializer = PairDeserializer(self._jrdd_deserializer,
+                                             other._jrdd_deserializer)
+        return RDD(pairRDD, self.ctx, deserializer)
+
+    def name(self):
+        """
+        Return the name of this RDD.
+        """
+        name_ = self._jrdd.name()
+        if not name_:
+            return None
+        return name_.encode('utf-8')
+
+    def setName(self, name):
+        """
+        Assign a name to this RDD.
+        >>> rdd1 = sc.parallelize([1,2])
+        >>> rdd1.setName('RDD1')
+        >>> rdd1.name()
+        'RDD1'
+        """
+        self._jrdd.setName(name)
+
+    def toDebugString(self):
+        """
+        A description of this RDD and its recursive dependencies for debugging.
+        """
+        debug_string = self._jrdd.toDebugString()
+        if not debug_string:
+            return None
+        return debug_string.encode('utf-8')
+
+    def getStorageLevel(self):
+        """
+        Get the RDD's current storage level.
+        >>> rdd1 = sc.parallelize([1,2])
+        >>> rdd1.getStorageLevel()
+        StorageLevel(False, False, False, 1)
+        """
+        java_storage_level = self._jrdd.getStorageLevel()
+        storage_level = StorageLevel(java_storage_level.useDisk(),
+                                     java_storage_level.useMemory(),
+                                     java_storage_level.deserialized(),
+                                     java_storage_level.replication())
+        return storage_level
+
     # TODO: `lookup` is disabled because we can't make direct comparisons based
     # on the key; we need to compare the hash of the key to the hash of the
     # keys in the pairs.  This could be an expensive operation, since those

http://git-wip-us.apache.org/repos/asf/spark/blob/249930ae/python/pyspark/storagelevel.py
----------------------------------------------------------------------
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index b31f476..c3e3a44 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -31,6 +31,10 @@ class StorageLevel:
         self.deserialized = deserialized
         self.replication = replication
 
+    def __repr__(self):
+        return "StorageLevel(%s, %s, %s, %s)" % (
+            self.useDisk, self.useMemory, self.deserialized, self.replication)
+
 StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
 StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
 StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)


[4/4] git commit: SPARK-977 Added Python RDD.zip function

Posted by pw...@apache.org.
SPARK-977 Added Python RDD.zip function

was raised earlier as a part of  apache/incubator-spark#486

Author: Prabin Banka <pr...@imaginea.com>

Closes #76 from prabinb/python-api-zip and squashes the following commits:

b1a31a0 [Prabin Banka] Added Python RDD.zip function

Conflicts:

	python/pyspark/rdd.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dc1e988
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dc1e988
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dc1e988

Branch: refs/heads/branch-0.9
Commit: 1dc1e988fd379bce8f661091bd8224724a7345a3
Parents: 249930a
Author: Prabin Banka <pr...@imaginea.com>
Authored: Mon Mar 10 13:27:00 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:16:17 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py         |  2 +-
 python/pyspark/serializers.py | 29 ++++++++++++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1dc1e988/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 0fdb9e6..c29cefa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -31,7 +31,7 @@ import warnings
 from heapq import heappush, heappop, heappushpop
 
 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
-    BatchedSerializer, CloudPickleSerializer, pack_long
+    BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
 from pyspark.join import python_join, python_left_outer_join, \
     python_right_outer_join, python_cogroup
 from pyspark.statcounter import StatCounter

http://git-wip-us.apache.org/repos/asf/spark/blob/1dc1e988/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 8c6ad79..12c63f1 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -204,7 +204,7 @@ class CartesianDeserializer(FramedSerializer):
         self.key_ser = key_ser
         self.val_ser = val_ser
 
-    def load_stream(self, stream):
+    def prepare_keys_values(self, stream):
         key_stream = self.key_ser._load_stream_without_unbatching(stream)
         val_stream = self.val_ser._load_stream_without_unbatching(stream)
         key_is_batched = isinstance(self.key_ser, BatchedSerializer)
@@ -212,6 +212,10 @@ class CartesianDeserializer(FramedSerializer):
         for (keys, vals) in izip(key_stream, val_stream):
             keys = keys if key_is_batched else [keys]
             vals = vals if val_is_batched else [vals]
+            yield (keys, vals)
+
+    def load_stream(self, stream):
+        for (keys, vals) in self.prepare_keys_values(stream):
             for pair in product(keys, vals):
                 yield pair
 
@@ -224,6 +228,29 @@ class CartesianDeserializer(FramedSerializer):
                (str(self.key_ser), str(self.val_ser))
 
 
+class PairDeserializer(CartesianDeserializer):
+    """
+    Deserializes the JavaRDD zip() of two PythonRDDs.
+    """
+
+    def __init__(self, key_ser, val_ser):
+        self.key_ser = key_ser
+        self.val_ser = val_ser
+
+    def load_stream(self, stream):
+        for (keys, vals) in self.prepare_keys_values(stream):
+            for pair in izip(keys, vals):
+                yield pair
+
+    def __eq__(self, other):
+        return isinstance(other, PairDeserializer) and \
+               self.key_ser == other.key_ser and self.val_ser == other.val_ser
+
+    def __str__(self):
+        return "PairDeserializer<%s, %s>" % \
+               (str(self.key_ser), str(self.val_ser))
+
+
 class NoOpSerializer(FramedSerializer):
 
     def loads(self, obj): return obj


[2/4] git commit: SPARK-1168, Added foldByKey to pyspark.

Posted by pw...@apache.org.
SPARK-1168, Added foldByKey to pyspark.

Author: Prashant Sharma <pr...@imaginea.com>

Closes #115 from ScrapCodes/SPARK-1168/pyspark-foldByKey and squashes the following commits:

db6f67e [Prashant Sharma] SPARK-1168, Added foldByKey to pyspark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44805058
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44805058
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44805058

Branch: refs/heads/branch-0.9
Commit: 44805058fc054f5d2f05a763bb3735c5b1afefb8
Parents: e74e79a
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Mon Mar 10 13:37:11 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:13:33 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/44805058/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 678b005..487bfb1 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -910,7 +910,21 @@ class RDD(object):
                     combiners[k] = mergeCombiners(combiners[k], v)
             return combiners.iteritems()
         return shuffled.mapPartitions(_mergeCombiners)
+    
+    def foldByKey(self, zeroValue, func, numPartitions=None):
+        """
+        Merge the values for each key using an associative function "func" and a neutral "zeroValue"
+        which may be added to the result an arbitrary number of times, and must not change 
+        the result (e.g., 0 for addition, or 1 for multiplication.).                
 
+        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
+        >>> from operator import add
+        >>> rdd.foldByKey(0, add).collect()
+        [('a', 2), ('b', 1)]
+        """
+        return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
+    
+    
     # TODO: support variant with custom partitioner
     def groupByKey(self, numPartitions=None):
         """