You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/01/15 20:41:00 UTC

spark git commit: [SPARK-5224] [PySpark] improve performance of parallelize list/ndarray

Repository: spark
Updated Branches:
  refs/heads/master 4b325c77a -> 3c8650c12


[SPARK-5224] [PySpark] improve performance of parallelize list/ndarray

After the default batchSize changed to 0 (batched based on the size of object), but parallelize() still use BatchedSerializer with batchSize=1, this PR will use batchSize=1024 for parallelize by default.

Also, BatchedSerializer did not work well with list and numpy.ndarray, this improve BatchedSerializer by using __len__ and __getslice__.

Here is the benchmark for parallelize 1 millions int with list or ndarray:

    |          before     |   after  | improvements
 ------- | ------------ | ------------- | -------
list |   11.7 s  | 0.8 s |  14x
numpy.ndarray     |  32 s  |   0.7 s | 40x

Author: Davies Liu <da...@databricks.com>

Closes #4024 from davies/opt_numpy and squashes the following commits:

7618c7c [Davies Liu] improve performance of parallelize list/ndarray


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c8650c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c8650c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c8650c1

Branch: refs/heads/master
Commit: 3c8650c12ad7a97852e7bd76153210493fd83e92
Parents: 4b325c7
Author: Davies Liu <da...@databricks.com>
Authored: Thu Jan 15 11:40:41 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Jan 15 11:40:41 2015 -0800

----------------------------------------------------------------------
 python/pyspark/context.py     | 2 +-
 python/pyspark/serializers.py | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c8650c1/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 593d74b..64f6a3c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -319,7 +319,7 @@ class SparkContext(object):
         # Make sure we distribute data evenly if it's smaller than self.batchSize
         if "__len__" not in dir(c):
             c = list(c)    # Make it a list so we can compute its length
-        batchSize = max(1, min(len(c) // numSlices, self._batchSize))
+        batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
         serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
         serializer.dump_stream(c, tempFile)
         tempFile.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/3c8650c1/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index bd08c9a..b8bda83 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -181,6 +181,10 @@ class BatchedSerializer(Serializer):
     def _batched(self, iterator):
         if self.batchSize == self.UNLIMITED_BATCH_SIZE:
             yield list(iterator)
+        elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
+            n = len(iterator)
+            for i in xrange(0, n, self.batchSize):
+                yield iterator[i: i + self.batchSize]
         else:
             items = []
             count = 0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org