You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2016/10/11 18:43:28 UTC
spark git commit: [SPARK-17817][PYSPARK] PySpark RDD Repartitioning
Results in Highly Skewed Partition Sizes
Repository: spark
Updated Branches:
refs/heads/master 75b9e3514 -> 07508bd01
[SPARK-17817][PYSPARK] PySpark RDD Repartitioning Results in Highly Skewed Partition Sizes
## What changes were proposed in this pull request?
Quoted from JIRA description:
Calling repartition on a PySpark RDD to increase the number of partitions results in highly skewed partition sizes, with most having 0 rows. The repartition method should evenly spread out the rows across the partitions, and this behavior is correctly seen on the Scala side.
Please reference the following code for a reproducible example of this issue:
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), 2) # start with 2 even partitions
l = a.repartition(num_partitions).glom().map(len).collect() # get length of each partition
min(l), max(l), sum(l)/len(l), len(l) # skewed!
In Scala's `repartition` code, we will distribute elements evenly across output partitions. However, the RDD from Python is serialized as a single binary data, so the distribution fails. We need to convert the RDD in Python to java object before repartitioning.
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #15389 from viirya/pyspark-rdd-repartition.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07508bd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07508bd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07508bd0
Branch: refs/heads/master
Commit: 07508bd01d16f3331be167ff92770d19c8b1f46a
Parents: 75b9e35
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Tue Oct 11 11:43:24 2016 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Tue Oct 11 11:43:24 2016 -0700
----------------------------------------------------------------------
python/pyspark/rdd.py | 13 ++++++++++---
python/pyspark/tests.py | 10 ++++++++++
2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/07508bd0/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ed81eb1..0e2ae19 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2017,8 +2017,7 @@ class RDD(object):
>>> len(rdd.repartition(10).glom().collect())
10
"""
- jrdd = self._jrdd.repartition(numPartitions)
- return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+ return self.coalesce(numPartitions, shuffle=True)
def coalesce(self, numPartitions, shuffle=False):
"""
@@ -2029,7 +2028,15 @@ class RDD(object):
>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]
"""
- jrdd = self._jrdd.coalesce(numPartitions, shuffle)
+ if shuffle:
+ # In Scala's repartition code, we will distribute elements evenly across output
+ # partitions. However, the RDD from Python is serialized as a single binary data,
+ # so the distribution fails and produces highly skewed partitions. We need to
+ # convert it to a RDD of java object before repartitioning.
+ data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle)
+ jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd)
+ else:
+ jrdd = self._jrdd.coalesce(numPartitions, shuffle)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
def zip(self, other):
http://git-wip-us.apache.org/repos/asf/spark/blob/07508bd0/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index b075691..3e0bd16 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -914,6 +914,16 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual(partitions[0], [(0, 5), (0, 8), (2, 6)])
self.assertEqual(partitions[1], [(1, 3), (3, 8), (3, 8)])
+ def test_repartition_no_skewed(self):
+ num_partitions = 20
+ a = self.sc.parallelize(range(int(1000)), 2)
+ l = a.repartition(num_partitions).glom().map(len).collect()
+ zeros = len([x for x in l if x == 0])
+ self.assertTrue(zeros == 0)
+ l = a.coalesce(num_partitions, True).glom().map(len).collect()
+ zeros = len([x for x in l if x == 0])
+ self.assertTrue(zeros == 0)
+
def test_distinct(self):
rdd = self.sc.parallelize((1, 2, 3)*10, 10)
self.assertEqual(rdd.getNumPartitions(), 10)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org