You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/03/10 21:37:17 UTC

git commit: SPARK-1168, Added foldByKey to pyspark.

Repository: spark
Updated Branches:
  refs/heads/master f5518989b -> a59419c27


SPARK-1168, Added foldByKey to pyspark.

Author: Prashant Sharma <pr...@imaginea.com>

Closes #115 from ScrapCodes/SPARK-1168/pyspark-foldByKey and squashes the following commits:

db6f67e [Prashant Sharma] SPARK-1168, Added foldByKey to pyspark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a59419c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a59419c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a59419c2

Branch: refs/heads/master
Commit: a59419c27e45f06be5143c58d48affb0a5158bdf
Parents: f551898
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Mon Mar 10 13:37:11 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Mar 10 13:37:11 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a59419c2/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index e1043ad..39916d2 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -946,7 +946,21 @@ class RDD(object):
                     combiners[k] = mergeCombiners(combiners[k], v)
             return combiners.iteritems()
         return shuffled.mapPartitions(_mergeCombiners)
+    
+    def foldByKey(self, zeroValue, func, numPartitions=None):
+        """
+        Merge the values for each key using an associative function "func" and a neutral "zeroValue"
+        which may be added to the result an arbitrary number of times, and must not change 
+        the result (e.g., 0 for addition, or 1 for multiplication.).                
 
+        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
+        >>> from operator import add
+        >>> rdd.foldByKey(0, add).collect()
+        [('a', 2), ('b', 1)]
+        """
+        return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
+    
+    
     # TODO: support variant with custom partitioner
     def groupByKey(self, numPartitions=None):
         """