You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/20 09:07:00 UTC
git commit: SPARK-2203: PySpark defaults to use same num reduce
partitions as map side
Repository: spark
Updated Branches:
refs/heads/master c55bbb49f -> f46e02fcd
SPARK-2203: PySpark defaults to use same num reduce partitions as map side
For shuffle-based operators, such as rdd.groupBy() or rdd.sortByKey(), PySpark will always assume that the default parallelism to use for the reduce side is ctx.defaultParallelism, which is a constant typically determined by the number of cores in cluster.
In contrast, Spark's Partitioner#defaultPartitioner will use the same number of reduce partitions as map partitions unless the defaultParallelism config is explicitly set. This tends to be a better default in order to avoid OOMs, and should also be the behavior of PySpark.
JIRA: https://issues.apache.org/jira/browse/SPARK-2203
Author: Aaron Davidson <aa...@databricks.com>
Closes #1138 from aarondav/pyfix and squashes the following commits:
1bd5751 [Aaron Davidson] SPARK-2203: PySpark defaults to use same num reduce partitions as map partitions
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f46e02fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f46e02fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f46e02fc
Branch: refs/heads/master
Commit: f46e02fcdbb3f86a8761c078708388d18282ee0c
Parents: c55bbb4
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Jun 20 00:06:57 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Jun 20 00:06:57 2014 -0700
----------------------------------------------------------------------
python/pyspark/rdd.py | 21 ++++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f46e02fc/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index a0b2c74..62a95c8 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -512,7 +512,7 @@ class RDD(object):
[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
"""
if numPartitions is None:
- numPartitions = self.ctx.defaultParallelism
+ numPartitions = self._defaultReducePartitions()
bounds = list()
@@ -1154,7 +1154,7 @@ class RDD(object):
set([])
"""
if numPartitions is None:
- numPartitions = self.ctx.defaultParallelism
+ numPartitions = self._defaultReducePartitions()
if partitionFunc is None:
partitionFunc = lambda x: 0 if x is None else hash(x)
@@ -1212,7 +1212,7 @@ class RDD(object):
[('a', '11'), ('b', '1')]
"""
if numPartitions is None:
- numPartitions = self.ctx.defaultParallelism
+ numPartitions = self._defaultReducePartitions()
def combineLocally(iterator):
combiners = {}
for x in iterator:
@@ -1475,6 +1475,21 @@ class RDD(object):
java_storage_level.replication())
return storage_level
+ def _defaultReducePartitions(self):
+ """
+ Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
+ If spark.default.parallelism is set, then we'll use the value from SparkContext
+ defaultParallelism, otherwise we'll use the number of partitions in this RDD.
+
+ This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
+ the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
+ be inherent.
+ """
+ if self.ctx._conf.contains("spark.default.parallelism"):
+ return self.ctx.defaultParallelism
+ else:
+ return self.getNumPartitions()
+
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those