You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/10/18 21:25:22 UTC
spark git commit: [SPARK-17817] [PYSPARK] [FOLLOWUP] PySpark RDD
Repartitioning Results in Highly Skewed Partition Sizes
Repository: spark
Updated Branches:
refs/heads/master cd106b050 -> 1e35e9693
[SPARK-17817] [PYSPARK] [FOLLOWUP] PySpark RDD Repartitioning Results in Highly Skewed Partition Sizes
## What changes were proposed in this pull request?
This change is a followup for #15389 which calls `_to_java_object_rdd()` to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too.
Simple benchmark:
import time
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), 2)
start = time.time()
l = a.repartition(num_partitions).glom().map(len).collect()
end = time.time()
print(end - start)
Before: 419.447577953
_to_java_object_rdd(): 421.916361094
decreasing the batch size: 423.712255955
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #15445 from viirya/repartition-batch-size.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e35e969
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e35e969
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e35e969
Branch: refs/heads/master
Commit: 1e35e969305555dda02cb0788c8143e5f2e1944b
Parents: cd106b0
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Tue Oct 18 14:25:10 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Tue Oct 18 14:25:10 2016 -0700
----------------------------------------------------------------------
python/pyspark/rdd.py | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1e35e969/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 0e2ae19..2de2c2f 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2029,12 +2029,12 @@ class RDD(object):
[[1, 2, 3, 4, 5]]
"""
if shuffle:
- # In Scala's repartition code, we will distribute elements evenly across output
- # partitions. However, the RDD from Python is serialized as a single binary data,
- # so the distribution fails and produces highly skewed partitions. We need to
- # convert it to a RDD of java object before repartitioning.
- data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle)
- jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd)
+ # Decrease the batch size in order to distribute evenly the elements across output
+ # partitions. Otherwise, repartition will possibly produce highly skewed partitions.
+ batchSize = min(10, self.ctx._batchSize or 1024)
+ ser = BatchedSerializer(PickleSerializer(), batchSize)
+ selfCopy = self._reserialize(ser)
+ jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
else:
jrdd = self._jrdd.coalesce(numPartitions, shuffle)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org