You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/01/15 20:41:00 UTC
spark git commit: [SPARK-5224] [PySpark] improve performance of
parallelize list/ndarray
Repository: spark
Updated Branches:
refs/heads/master 4b325c77a -> 3c8650c12
[SPARK-5224] [PySpark] improve performance of parallelize list/ndarray
After the default batchSize changed to 0 (batched based on the size of object), but parallelize() still use BatchedSerializer with batchSize=1, this PR will use batchSize=1024 for parallelize by default.
Also, BatchedSerializer did not work well with list and numpy.ndarray, this improve BatchedSerializer by using __len__ and __getslice__.
Here is the benchmark for parallelize 1 millions int with list or ndarray:
| before | after | improvements
------- | ------------ | ------------- | -------
list | 11.7 s | 0.8 s | 14x
numpy.ndarray | 32 s | 0.7 s | 40x
Author: Davies Liu <da...@databricks.com>
Closes #4024 from davies/opt_numpy and squashes the following commits:
7618c7c [Davies Liu] improve performance of parallelize list/ndarray
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c8650c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c8650c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c8650c1
Branch: refs/heads/master
Commit: 3c8650c12ad7a97852e7bd76153210493fd83e92
Parents: 4b325c7
Author: Davies Liu <da...@databricks.com>
Authored: Thu Jan 15 11:40:41 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Jan 15 11:40:41 2015 -0800
----------------------------------------------------------------------
python/pyspark/context.py | 2 +-
python/pyspark/serializers.py | 4 ++++
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3c8650c1/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 593d74b..64f6a3c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -319,7 +319,7 @@ class SparkContext(object):
# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(c):
c = list(c) # Make it a list so we can compute its length
- batchSize = max(1, min(len(c) // numSlices, self._batchSize))
+ batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
serializer.dump_stream(c, tempFile)
tempFile.close()
http://git-wip-us.apache.org/repos/asf/spark/blob/3c8650c1/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index bd08c9a..b8bda83 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -181,6 +181,10 @@ class BatchedSerializer(Serializer):
def _batched(self, iterator):
if self.batchSize == self.UNLIMITED_BATCH_SIZE:
yield list(iterator)
+ elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
+ n = len(iterator)
+ for i in xrange(0, n, self.batchSize):
+ yield iterator[i: i + self.batchSize]
else:
items = []
count = 0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org