You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2016/10/11 18:43:28 UTC

spark git commit: [SPARK-17817][PYSPARK] PySpark RDD Repartitioning Results in Highly Skewed Partition Sizes

Repository: spark
Updated Branches:
  refs/heads/master 75b9e3514 -> 07508bd01


[SPARK-17817][PYSPARK] PySpark RDD Repartitioning Results in Highly Skewed Partition Sizes

## What changes were proposed in this pull request?

Quoted from JIRA description:

Calling repartition on a PySpark RDD to increase the number of partitions results in highly skewed partition sizes, with most having 0 rows. The repartition method should evenly spread out the rows across the partitions, and this behavior is correctly seen on the Scala side.

Please reference the following code for a reproducible example of this issue:

    num_partitions = 20000
    a = sc.parallelize(range(int(1e6)), 2)  # start with 2 even partitions
    l = a.repartition(num_partitions).glom().map(len).collect()  # get length of each partition
    min(l), max(l), sum(l)/len(l), len(l)  # skewed!

In Scala's `repartition` code, we will distribute elements evenly across output partitions. However, the RDD from Python is serialized as a single binary data, so the distribution fails. We need to convert the RDD in Python to java object before repartitioning.

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #15389 from viirya/pyspark-rdd-repartition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07508bd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07508bd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07508bd0

Branch: refs/heads/master
Commit: 07508bd01d16f3331be167ff92770d19c8b1f46a
Parents: 75b9e35
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Tue Oct 11 11:43:24 2016 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Tue Oct 11 11:43:24 2016 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py   | 13 ++++++++++---
 python/pyspark/tests.py | 10 ++++++++++
 2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07508bd0/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ed81eb1..0e2ae19 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2017,8 +2017,7 @@ class RDD(object):
          >>> len(rdd.repartition(10).glom().collect())
          10
         """
-        jrdd = self._jrdd.repartition(numPartitions)
-        return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+        return self.coalesce(numPartitions, shuffle=True)
 
     def coalesce(self, numPartitions, shuffle=False):
         """
@@ -2029,7 +2028,15 @@ class RDD(object):
         >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
         [[1, 2, 3, 4, 5]]
         """
-        jrdd = self._jrdd.coalesce(numPartitions, shuffle)
+        if shuffle:
+            # In Scala's repartition code, we will distribute elements evenly across output
+            # partitions. However, the RDD from Python is serialized as a single binary data,
+            # so the distribution fails and produces highly skewed partitions. We need to
+            # convert it to a RDD of java object before repartitioning.
+            data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle)
+            jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd)
+        else:
+            jrdd = self._jrdd.coalesce(numPartitions, shuffle)
         return RDD(jrdd, self.ctx, self._jrdd_deserializer)
 
     def zip(self, other):

http://git-wip-us.apache.org/repos/asf/spark/blob/07508bd0/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index b075691..3e0bd16 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -914,6 +914,16 @@ class RDDTests(ReusedPySparkTestCase):
         self.assertEqual(partitions[0], [(0, 5), (0, 8), (2, 6)])
         self.assertEqual(partitions[1], [(1, 3), (3, 8), (3, 8)])
 
+    def test_repartition_no_skewed(self):
+        num_partitions = 20
+        a = self.sc.parallelize(range(int(1000)), 2)
+        l = a.repartition(num_partitions).glom().map(len).collect()
+        zeros = len([x for x in l if x == 0])
+        self.assertTrue(zeros == 0)
+        l = a.coalesce(num_partitions, True).glom().map(len).collect()
+        zeros = len([x for x in l if x == 0])
+        self.assertTrue(zeros == 0)
+
     def test_distinct(self):
         rdd = self.sc.parallelize((1, 2, 3)*10, 10)
         self.assertEqual(rdd.getNumPartitions(), 10)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org