You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/20 09:07:00 UTC

git commit: SPARK-2203: PySpark defaults to use same num reduce partitions as map side

Repository: spark
Updated Branches:
  refs/heads/master c55bbb49f -> f46e02fcd


SPARK-2203: PySpark defaults to use same num reduce partitions as map side

For shuffle-based operators, such as rdd.groupBy() or rdd.sortByKey(), PySpark will always assume that the default parallelism to use for the reduce side is ctx.defaultParallelism, which is a constant typically determined by the number of cores in cluster.

In contrast, Spark's Partitioner#defaultPartitioner will use the same number of reduce partitions as map partitions unless the defaultParallelism config is explicitly set. This tends to be a better default in order to avoid OOMs, and should also be the behavior of PySpark.

JIRA: https://issues.apache.org/jira/browse/SPARK-2203

Author: Aaron Davidson <aa...@databricks.com>

Closes #1138 from aarondav/pyfix and squashes the following commits:

1bd5751 [Aaron Davidson] SPARK-2203: PySpark defaults to use same num reduce partitions as map partitions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f46e02fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f46e02fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f46e02fc

Branch: refs/heads/master
Commit: f46e02fcdbb3f86a8761c078708388d18282ee0c
Parents: c55bbb4
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Jun 20 00:06:57 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Jun 20 00:06:57 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f46e02fc/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index a0b2c74..62a95c8 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -512,7 +512,7 @@ class RDD(object):
         [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
         """
         if numPartitions is None:
-            numPartitions = self.ctx.defaultParallelism
+            numPartitions = self._defaultReducePartitions()
 
         bounds = list()
 
@@ -1154,7 +1154,7 @@ class RDD(object):
         set([])
         """
         if numPartitions is None:
-            numPartitions = self.ctx.defaultParallelism
+            numPartitions = self._defaultReducePartitions()
 
         if partitionFunc is None:
             partitionFunc = lambda x: 0 if x is None else hash(x)
@@ -1212,7 +1212,7 @@ class RDD(object):
         [('a', '11'), ('b', '1')]
         """
         if numPartitions is None:
-            numPartitions = self.ctx.defaultParallelism
+            numPartitions = self._defaultReducePartitions()
         def combineLocally(iterator):
             combiners = {}
             for x in iterator:
@@ -1475,6 +1475,21 @@ class RDD(object):
                                      java_storage_level.replication())
         return storage_level
 
+    def _defaultReducePartitions(self):
+        """
+        Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
+        If spark.default.parallelism is set, then we'll use the value from SparkContext
+        defaultParallelism, otherwise we'll use the number of partitions in this RDD.
+
+        This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
+        the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
+        be inherent.
+        """
+        if self.ctx._conf.contains("spark.default.parallelism"):
+            return self.ctx.defaultParallelism
+        else:
+            return self.getNumPartitions()
+
     # TODO: `lookup` is disabled because we can't make direct comparisons based
     # on the key; we need to compare the hash of the key to the hash of the
     # keys in the pairs.  This could be an expensive operation, since those