You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/10/10 23:14:16 UTC
git commit: [SPARK-3886] [PySpark] use AutoBatchedSerializer by
default
Repository: spark
Updated Branches:
refs/heads/master 90f73fcc4 -> 72f36ee57
[SPARK-3886] [PySpark] use AutoBatchedSerializer by default
Use AutoBatchedSerializer by default, which will choose the proper batch size based on size of serialized objects, let the size of serialized batch fall in into [64k - 640k].
In JVM, the serializer will also track the objects in batch to figure out duplicated objects, larger batch may cause OOM in JVM.
Author: Davies Liu <da...@gmail.com>
Closes #2740 from davies/batchsize and squashes the following commits:
52cdb88 [Davies Liu] update docs
185f2b9 [Davies Liu] use AutoBatchedSerializer by default
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72f36ee5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72f36ee5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72f36ee5
Branch: refs/heads/master
Commit: 72f36ee571ad27c7c7c70bb9aecc7e6ef51dfd44
Parents: 90f73fc
Author: Davies Liu <da...@gmail.com>
Authored: Fri Oct 10 14:14:05 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Fri Oct 10 14:14:05 2014 -0700
----------------------------------------------------------------------
python/pyspark/context.py | 11 +++++++----
python/pyspark/serializers.py | 4 ++--
2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/72f36ee5/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6fb30d6..85c0462 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -29,7 +29,7 @@ from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
- PairDeserializer, CompressedSerializer
+ PairDeserializer, CompressedSerializer, AutoBatchedSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
@@ -67,7 +67,7 @@ class SparkContext(object):
_default_batch_size_for_serialized_input = 10
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
- environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
+ environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
@@ -83,8 +83,9 @@ class SparkContext(object):
:param environment: A dictionary of environment variables to set on
worker nodes.
:param batchSize: The number of Python objects represented as a single
- Java object. Set 1 to disable batching or -1 to use an
- unlimited batch size.
+ Java object. Set 1 to disable batching, 0 to automatically choose
+ the batch size based on object sizes, or -1 to use an unlimited
+ batch size
:param serializer: The serializer for RDDs.
:param conf: A L{SparkConf} object setting Spark properties.
:param gateway: Use an existing gateway and JVM, otherwise a new JVM
@@ -117,6 +118,8 @@ class SparkContext(object):
self._unbatched_serializer = serializer
if batchSize == 1:
self.serializer = self._unbatched_serializer
+ elif batchSize == 0:
+ self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
else:
self.serializer = BatchedSerializer(self._unbatched_serializer,
batchSize)
http://git-wip-us.apache.org/repos/asf/spark/blob/72f36ee5/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 099fa54..3d1a34b 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -220,7 +220,7 @@ class AutoBatchedSerializer(BatchedSerializer):
Choose the size of batch automatically based on the size of object
"""
- def __init__(self, serializer, bestSize=1 << 20):
+ def __init__(self, serializer, bestSize=1 << 16):
BatchedSerializer.__init__(self, serializer, -1)
self.bestSize = bestSize
@@ -247,7 +247,7 @@ class AutoBatchedSerializer(BatchedSerializer):
other.serializer == self.serializer)
def __str__(self):
- return "BatchedSerializer<%s>" % str(self.serializer)
+ return "AutoBatchedSerializer<%s>" % str(self.serializer)
class CartesianDeserializer(FramedSerializer):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org