You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/09/22 08:21:29 UTC

spark git commit: [SPARK-9821] [PYSPARK] pyspark-reduceByKey-should-take-a-custom-partitioner

Repository: spark
Updated Branches:
  refs/heads/master c986e933a -> 1cd674157


[SPARK-9821] [PYSPARK] pyspark-reduceByKey-should-take-a-custom-partitioner

from the issue:

In Scala, I can supply a custom partitioner to reduceByKey (and other aggregation/repartitioning methods like aggregateByKey and combinedByKey), but as far as I can tell from the Pyspark API, there's no way to do the same in Python.
Here's an example of my code in Scala:
weblogs.map(s => (getFileType(s), 1)).reduceByKey(new FileTypePartitioner(),_+_)
But I can't figure out how to do the same in Python. The closest I can get is to call repartition before reduceByKey like so:
weblogs.map(lambda s: (getFileType(s), 1)).partitionBy(3,hash_filetype).reduceByKey(lambda v1,v2: v1+v2).collect()
But that defeats the purpose, because I'm shuffling twice instead of once, so my performance is worse instead of better.

Author: Holden Karau <ho...@pigscanfly.ca>

Closes #8569 from holdenk/SPARK-9821-pyspark-reduceByKey-should-take-a-custom-partitioner.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cd67415
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cd67415
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cd67415

Branch: refs/heads/master
Commit: 1cd67415728e660a90e4dbe136272b5d6b8f1142
Parents: c986e93
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Mon Sep 21 23:21:24 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Mon Sep 21 23:21:24 2015 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 29 ++++++++++++++++-------------
 1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1cd67415/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 73d7d9a..56e8922 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -686,7 +686,7 @@ class RDD(object):
                                              other._jrdd_deserializer)
         return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
 
-    def groupBy(self, f, numPartitions=None):
+    def groupBy(self, f, numPartitions=None, partitionFunc=portable_hash):
         """
         Return an RDD of grouped items.
 
@@ -695,7 +695,7 @@ class RDD(object):
         >>> sorted([(x, sorted(y)) for (x, y) in result])
         [(0, [2, 8]), (1, [1, 1, 3, 5])]
         """
-        return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
+        return self.map(lambda x: (f(x), x)).groupByKey(numPartitions, partitionFunc)
 
     @ignore_unicode_prefix
     def pipe(self, command, env=None, checkCode=False):
@@ -1539,22 +1539,23 @@ class RDD(object):
         """
         return self.map(lambda x: x[1])
 
-    def reduceByKey(self, func, numPartitions=None):
+    def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
         """
         Merge the values for each key using an associative reduce function.
 
         This will also perform the merging locally on each mapper before
         sending results to a reducer, similarly to a "combiner" in MapReduce.
 
-        Output will be hash-partitioned with C{numPartitions} partitions, or
+        Output will be partitioned with C{numPartitions} partitions, or
         the default parallelism level if C{numPartitions} is not specified.
+        Default partitioner is hash-partition.
 
         >>> from operator import add
         >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
         >>> sorted(rdd.reduceByKey(add).collect())
         [('a', 2), ('b', 1)]
         """
-        return self.combineByKey(lambda x: x, func, func, numPartitions)
+        return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
 
     def reduceByKeyLocally(self, func):
         """
@@ -1739,7 +1740,7 @@ class RDD(object):
 
     # TODO: add control over map-side aggregation
     def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
-                     numPartitions=None):
+                     numPartitions=None, partitionFunc=portable_hash):
         """
         Generic function to combine the elements for each key using a custom
         set of aggregation functions.
@@ -1777,7 +1778,7 @@ class RDD(object):
             return merger.items()
 
         locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
-        shuffled = locally_combined.partitionBy(numPartitions)
+        shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
 
         def _mergeCombiners(iterator):
             merger = ExternalMerger(agg, memory, serializer)
@@ -1786,7 +1787,8 @@ class RDD(object):
 
         return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
 
-    def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
+    def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None,
+                       partitionFunc=portable_hash):
         """
         Aggregate the values of each key, using given combine functions and a neutral
         "zero value". This function can return a different result type, U, than the type
@@ -1800,9 +1802,9 @@ class RDD(object):
             return copy.deepcopy(zeroValue)
 
         return self.combineByKey(
-            lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
+            lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, partitionFunc)
 
-    def foldByKey(self, zeroValue, func, numPartitions=None):
+    def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash):
         """
         Merge the values for each key using an associative function "func"
         and a neutral "zeroValue" which may be added to the result an
@@ -1817,13 +1819,14 @@ class RDD(object):
         def createZero():
             return copy.deepcopy(zeroValue)
 
-        return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
+        return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions,
+                                 partitionFunc)
 
     def _memory_limit(self):
         return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
 
     # TODO: support variant with custom partitioner
-    def groupByKey(self, numPartitions=None):
+    def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
         """
         Group the values for each key in the RDD into a single sequence.
         Hash-partitions the resulting RDD with numPartitions partitions.
@@ -1859,7 +1862,7 @@ class RDD(object):
             return merger.items()
 
         locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
-        shuffled = locally_combined.partitionBy(numPartitions)
+        shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
 
         def groupByKey(it):
             merger = ExternalGroupBy(agg, memory, serializer)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org