You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/11/04 08:56:28 UTC

git commit: [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.

Repository: spark
Updated Branches:
  refs/heads/master b671ce047 -> e4f42631a


[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.

This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1.

Author: Davies Liu <da...@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Josh Rosen <jo...@databricks.com>

Closes #2920 from davies/fix_autobatch and squashes the following commits:

e544ef9 [Davies Liu] revert unrelated change
6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
1d557fc [Davies Liu] fix tests
8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
76abdce [Davies Liu] clean up
53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
b4292ce [Davies Liu] fix bug in master
d79744c [Davies Liu] recover hive tests
be37ece [Davies Liu] refactor
eb3938d [Davies Liu] refactor serializer in scala
8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4f42631
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4f42631
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4f42631

Branch: refs/heads/master
Commit: e4f42631a68b473ce706429915f3f08042af2119
Parents: b671ce0
Author: Davies Liu <da...@databricks.com>
Authored: Mon Nov 3 23:56:14 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Nov 3 23:56:14 2014 -0800

----------------------------------------------------------------------
 .../spark/api/python/PythonHadoopUtil.scala     |   6 +-
 .../org/apache/spark/api/python/PythonRDD.scala | 110 +----------------
 .../org/apache/spark/api/python/SerDeUtil.scala | 121 ++++++++++++++-----
 .../WriteInputFormatTestDataGenerator.scala     |  10 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala |   2 +-
 python/pyspark/context.py                       |  58 +++------
 python/pyspark/mllib/common.py                  |   2 +-
 python/pyspark/mllib/recommendation.py          |   2 +-
 python/pyspark/rdd.py                           |  91 ++++++--------
 python/pyspark/serializers.py                   |  36 ++----
 python/pyspark/shuffle.py                       |   7 +-
 python/pyspark/sql.py                           |  18 ++-
 python/pyspark/tests.py                         |  66 ++--------
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  10 +-
 14 files changed, 201 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 49dc95f..5ba6617 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -61,8 +61,7 @@ private[python] object Converter extends Logging {
  * Other objects are passed through without conversion.
  */
 private[python] class WritableToJavaConverter(
-    conf: Broadcast[SerializableWritable[Configuration]],
-    batchSize: Int) extends Converter[Any, Any] {
+    conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
 
   /**
    * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -94,8 +93,7 @@ private[python] class WritableToJavaConverter(
           map.put(convertWritable(k), convertWritable(v))
         }
         map
-      case w: Writable =>
-        if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
+      case w: Writable => WritableUtils.clone(w, conf.value.value)
       case other => other
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 61b125e..e94ccdc 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -22,12 +22,10 @@ import java.net._
 import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
 
 import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.existentials
 
 import com.google.common.base.Charsets.UTF_8
-import net.razorvine.pickle.{Pickler, Unpickler}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
@@ -442,7 +440,7 @@ private[spark] object PythonRDD extends Logging {
     val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
     val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
-      new WritableToJavaConverter(confBroadcasted, batchSize))
+      new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
@@ -468,7 +466,7 @@ private[spark] object PythonRDD extends Logging {
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
     val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
-      new WritableToJavaConverter(confBroadcasted, batchSize))
+      new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
@@ -494,7 +492,7 @@ private[spark] object PythonRDD extends Logging {
         None, inputFormatClass, keyClass, valueClass, conf)
     val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
-      new WritableToJavaConverter(confBroadcasted, batchSize))
+      new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
@@ -537,7 +535,7 @@ private[spark] object PythonRDD extends Logging {
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
     val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
-      new WritableToJavaConverter(confBroadcasted, batchSize))
+      new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
@@ -563,7 +561,7 @@ private[spark] object PythonRDD extends Logging {
         None, inputFormatClass, keyClass, valueClass, conf)
     val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
-      new WritableToJavaConverter(confBroadcasted, batchSize))
+      new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
@@ -746,104 +744,6 @@ private[spark] object PythonRDD extends Logging {
       converted.saveAsHadoopDataset(new JobConf(conf))
     }
   }
-
-
-  /**
-   * Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
-   */
-  @deprecated("PySpark does not use it anymore", "1.1")
-  def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
-    pyRDD.rdd.mapPartitions { iter =>
-      val unpickle = new Unpickler
-      SerDeUtil.initialize()
-      iter.flatMap { row =>
-        unpickle.loads(row) match {
-          // in case of objects are pickled in batch mode
-          case objs: JArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
-          // not in batch mode
-          case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
-        }
-      }
-    }
-  }
-
-  /**
-   * Convert an RDD of serialized Python tuple to Array (no recursive conversions).
-   * It is only used by pyspark.sql.
-   */
-  def pythonToJavaArray(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Array[_]] = {
-
-    def toArray(obj: Any): Array[_] = {
-      obj match {
-        case objs: JArrayList[_] =>
-          objs.toArray
-        case obj if obj.getClass.isArray =>
-          obj.asInstanceOf[Array[_]].toArray
-      }
-    }
-
-    pyRDD.rdd.mapPartitions { iter =>
-      val unpickle = new Unpickler
-      iter.flatMap { row =>
-        val obj = unpickle.loads(row)
-        if (batched) {
-          obj.asInstanceOf[JArrayList[_]].map(toArray)
-        } else {
-          Seq(toArray(obj))
-        }
-      }
-    }.toJavaRDD()
-  }
-
-  private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
-    private val pickle = new Pickler()
-    private var batch = 1
-    private val buffer = new mutable.ArrayBuffer[Any]
-
-    override def hasNext(): Boolean = iter.hasNext
-
-    override def next(): Array[Byte] = {
-      while (iter.hasNext && buffer.length < batch) {
-        buffer += iter.next()
-      }
-      val bytes = pickle.dumps(buffer.toArray)
-      val size = bytes.length
-      // let  1M < size < 10M
-      if (size < 1024 * 1024) {
-        batch *= 2
-      } else if (size > 1024 * 1024 * 10 && batch > 1) {
-        batch /= 2
-      }
-      buffer.clear()
-      bytes
-    }
-  }
-
-  /**
-   * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
-   * PySpark.
-   */
-  def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
-    jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
-  }
-
-  /**
-    * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
-    */
-  def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
-    pyRDD.rdd.mapPartitions { iter =>
-      SerDeUtil.initialize()
-      val unpickle = new Unpickler
-      iter.flatMap { row =>
-        val obj = unpickle.loads(row)
-        if (batched) {
-          obj.asInstanceOf[JArrayList[_]].asScala
-        } else {
-          Seq(obj)
-        }
-      }
-    }.toJavaRDD()
-  }
 }
 
 private

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index ebdc353..a4153aa 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -18,8 +18,13 @@
 package org.apache.spark.api.python
 
 import java.nio.ByteOrder
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.spark.api.java.JavaRDD
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Failure
 import scala.util.Try
 
@@ -89,6 +94,73 @@ private[spark] object SerDeUtil extends Logging {
   }
   initialize()
 
+
+  /**
+   * Convert an RDD of Java objects to Array (no recursive conversions).
+   * It is only used by pyspark.sql.
+   */
+  def toJavaArray(jrdd: JavaRDD[Any]): JavaRDD[Array[_]] = {
+    jrdd.rdd.map {
+      case objs: JArrayList[_] =>
+        objs.toArray
+      case obj if obj.getClass.isArray =>
+        obj.asInstanceOf[Array[_]].toArray
+    }.toJavaRDD()
+  }
+
+  /**
+   * Choose batch size based on size of objects
+   */
+  private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
+    private val pickle = new Pickler()
+    private var batch = 1
+    private val buffer = new mutable.ArrayBuffer[Any]
+
+    override def hasNext: Boolean = iter.hasNext
+
+    override def next(): Array[Byte] = {
+      while (iter.hasNext && buffer.length < batch) {
+        buffer += iter.next()
+      }
+      val bytes = pickle.dumps(buffer.toArray)
+      val size = bytes.length
+      // let  1M < size < 10M
+      if (size < 1024 * 1024) {
+        batch *= 2
+      } else if (size > 1024 * 1024 * 10 && batch > 1) {
+        batch /= 2
+      }
+      buffer.clear()
+      bytes
+    }
+  }
+
+  /**
+   * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
+   * PySpark.
+   */
+  private[spark] def javaToPython(jRDD: JavaRDD[_]): JavaRDD[Array[Byte]] = {
+    jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
+  }
+
+  /**
+   * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
+   */
+  def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
+    pyRDD.rdd.mapPartitions { iter =>
+      initialize()
+      val unpickle = new Unpickler
+      iter.flatMap { row =>
+        val obj = unpickle.loads(row)
+        if (batched) {
+          obj.asInstanceOf[JArrayList[_]].asScala
+        } else {
+          Seq(obj)
+        }
+      }
+    }.toJavaRDD()
+  }
+
   private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
     val pickle = new Pickler
     val kt = Try {
@@ -128,17 +200,18 @@ private[spark] object SerDeUtil extends Logging {
    */
   def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
     val (keyFailed, valueFailed) = checkPickle(rdd.first())
+
     rdd.mapPartitions { iter =>
-      val pickle = new Pickler
       val cleaned = iter.map { case (k, v) =>
         val key = if (keyFailed) k.toString else k
         val value = if (valueFailed) v.toString else v
         Array[Any](key, value)
       }
-      if (batchSize > 1) {
-        cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
+      if (batchSize == 0) {
+        new AutoBatchedPickler(cleaned)
       } else {
-        cleaned.map(pickle.dumps(_))
+        val pickle = new Pickler
+        cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
       }
     }
   }
@@ -146,36 +219,22 @@ private[spark] object SerDeUtil extends Logging {
   /**
    * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)].
    */
-  def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: Boolean): RDD[(K, V)] = {
+  def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batched: Boolean): RDD[(K, V)] = {
     def isPair(obj: Any): Boolean = {
-      Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) &&
+      Option(obj.getClass.getComponentType).exists(!_.isPrimitive) &&
         obj.asInstanceOf[Array[_]].length == 2
     }
-    pyRDD.mapPartitions { iter =>
-      initialize()
-      val unpickle = new Unpickler
-      val unpickled =
-        if (batchSerialized) {
-          iter.flatMap { batch =>
-            unpickle.loads(batch) match {
-              case objs: java.util.List[_] => collectionAsScalaIterable(objs)
-              case other => throw new SparkException(
-                s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
-            }
-          }
-        } else {
-          iter.map(unpickle.loads(_))
-        }
-      unpickled.map {
-        case obj if isPair(obj) =>
-          // we only accept (K, V)
-          val arr = obj.asInstanceOf[Array[_]]
-          (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
-        case other => throw new SparkException(
-          s"RDD element of type ${other.getClass.getName} cannot be used")
-      }
+
+    val rdd = pythonToJava(pyRDD, batched).rdd
+    rdd.first match {
+      case obj if isPair(obj) =>
+        // we only accept (K, V)
+      case other => throw new SparkException(
+        s"RDD element of type ${other.getClass.getName} cannot be used")
+    }
+    rdd.map { obj =>
+      val arr = obj.asInstanceOf[Array[_]]
+      (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
     }
   }
-
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index e9ca916..c0cbd28 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -176,11 +176,11 @@ object WriteInputFormatTestDataGenerator {
 
     // Create test data for arbitrary custom writable TestWritable
     val testClass = Seq(
-      ("1", TestWritable("test1", 123, 54.0)),
-      ("2", TestWritable("test2", 456, 8762.3)),
-      ("1", TestWritable("test3", 123, 423.1)),
-      ("3", TestWritable("test56", 456, 423.5)),
-      ("2", TestWritable("test2", 123, 5435.2))
+      ("1", TestWritable("test1", 1, 1.0)),
+      ("2", TestWritable("test2", 2, 2.3)),
+      ("3", TestWritable("test3", 3, 3.1)),
+      ("5", TestWritable("test56", 5, 5.5)),
+      ("4", TestWritable("test4", 4, 4.2))
     )
     val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
     rdd.saveAsNewAPIHadoopFile(classPath,

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index acdc67d..65b98a8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -736,7 +736,7 @@ private[spark] object SerDe extends Serializable {
   def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
     jRDD.rdd.mapPartitions { iter =>
       initialize()  // let it called in executor
-      new PythonRDD.AutoBatchedPickler(iter)
+      new SerDeUtil.AutoBatchedPickler(iter)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5f8dced..a0e4821 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -63,7 +63,6 @@ class SparkContext(object):
     _active_spark_context = None
     _lock = Lock()
     _python_includes = None  # zip and egg files that need to be added to PYTHONPATH
-    _default_batch_size_for_serialized_input = 10
 
     def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
                  environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
@@ -115,9 +114,7 @@ class SparkContext(object):
         self._conf = conf or SparkConf(_jvm=self._jvm)
         self._batchSize = batchSize  # -1 represents an unlimited batch size
         self._unbatched_serializer = serializer
-        if batchSize == 1:
-            self.serializer = self._unbatched_serializer
-        elif batchSize == 0:
+        if batchSize == 0:
             self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
         else:
             self.serializer = BatchedSerializer(self._unbatched_serializer,
@@ -305,12 +302,8 @@ class SparkContext(object):
         # Make sure we distribute data evenly if it's smaller than self.batchSize
         if "__len__" not in dir(c):
             c = list(c)    # Make it a list so we can compute its length
-        batchSize = min(len(c) // numSlices, self._batchSize)
-        if batchSize > 1:
-            serializer = BatchedSerializer(self._unbatched_serializer,
-                                           batchSize)
-        else:
-            serializer = self._unbatched_serializer
+        batchSize = max(1, min(len(c) // numSlices, self._batchSize))
+        serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
         serializer.dump_stream(c, tempFile)
         tempFile.close()
         readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
@@ -328,8 +321,7 @@ class SparkContext(object):
         [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
         """
         minPartitions = minPartitions or self.defaultMinPartitions
-        return RDD(self._jsc.objectFile(name, minPartitions), self,
-                   BatchedSerializer(PickleSerializer()))
+        return RDD(self._jsc.objectFile(name, minPartitions), self)
 
     def textFile(self, name, minPartitions=None, use_unicode=True):
         """
@@ -405,7 +397,7 @@ class SparkContext(object):
         return jm
 
     def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
-                     valueConverter=None, minSplits=None, batchSize=None):
+                     valueConverter=None, minSplits=None, batchSize=0):
         """
         Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -427,17 +419,15 @@ class SparkContext(object):
         :param minSplits: minimum splits in dataset
                (default min(2, sc.defaultParallelism))
         :param batchSize: The number of Python objects represented as a single
-               Java object. (default sc._default_batch_size_for_serialized_input)
+               Java object. (default 0, choose batchSize automatically)
         """
         minSplits = minSplits or min(self.defaultParallelism, 2)
-        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
-        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
                                                 keyConverter, valueConverter, minSplits, batchSize)
-        return RDD(jrdd, self, ser)
+        return RDD(jrdd, self)
 
     def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                         valueConverter=None, conf=None, batchSize=None):
+                         valueConverter=None, conf=None, batchSize=0):
         """
         Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -458,18 +448,16 @@ class SparkContext(object):
         :param conf: Hadoop configuration, passed in as a dict
                (None by default)
         :param batchSize: The number of Python objects represented as a single
-               Java object. (default sc._default_batch_size_for_serialized_input)
+               Java object. (default 0, choose batchSize automatically)
         """
         jconf = self._dictToJavaMap(conf)
-        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
-        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
                                                     valueClass, keyConverter, valueConverter,
                                                     jconf, batchSize)
-        return RDD(jrdd, self, ser)
+        return RDD(jrdd, self)
 
     def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                        valueConverter=None, conf=None, batchSize=None):
+                        valueConverter=None, conf=None, batchSize=0):
         """
         Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
         Hadoop configuration, which is passed in as a Python dict.
@@ -487,18 +475,16 @@ class SparkContext(object):
         :param conf: Hadoop configuration, passed in as a dict
                (None by default)
         :param batchSize: The number of Python objects represented as a single
-               Java object. (default sc._default_batch_size_for_serialized_input)
+               Java object. (default 0, choose batchSize automatically)
         """
         jconf = self._dictToJavaMap(conf)
-        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
-        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
                                                    valueClass, keyConverter, valueConverter,
                                                    jconf, batchSize)
-        return RDD(jrdd, self, ser)
+        return RDD(jrdd, self)
 
     def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                   valueConverter=None, conf=None, batchSize=None):
+                   valueConverter=None, conf=None, batchSize=0):
         """
         Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -519,18 +505,16 @@ class SparkContext(object):
         :param conf: Hadoop configuration, passed in as a dict
                (None by default)
         :param batchSize: The number of Python objects represented as a single
-               Java object. (default sc._default_batch_size_for_serialized_input)
+               Java object. (default 0, choose batchSize automatically)
         """
         jconf = self._dictToJavaMap(conf)
-        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
-        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
                                               valueClass, keyConverter, valueConverter,
                                               jconf, batchSize)
-        return RDD(jrdd, self, ser)
+        return RDD(jrdd, self)
 
     def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                  valueConverter=None, conf=None, batchSize=None):
+                  valueConverter=None, conf=None, batchSize=0):
         """
         Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
         Hadoop configuration, which is passed in as a Python dict.
@@ -548,15 +532,13 @@ class SparkContext(object):
         :param conf: Hadoop configuration, passed in as a dict
                (None by default)
         :param batchSize: The number of Python objects represented as a single
-               Java object. (default sc._default_batch_size_for_serialized_input)
+               Java object. (default 0, choose batchSize automatically)
         """
         jconf = self._dictToJavaMap(conf)
-        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
-        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass,
                                              valueClass, keyConverter, valueConverter,
                                              jconf, batchSize)
-        return RDD(jrdd, self, ser)
+        return RDD(jrdd, self)
 
     def _checkpointFile(self, name, input_deserializer):
         jrdd = self._jsc.checkpointFile(name)
@@ -836,7 +818,7 @@ def _test():
     import doctest
     import tempfile
     globs = globals().copy()
-    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    globs['sc'] = SparkContext('local[4]', 'PythonTest')
     globs['tempdir'] = tempfile.mkdtemp()
     atexit.register(lambda: shutil.rmtree(globs['tempdir']))
     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/python/pyspark/mllib/common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py
index 76864d8..dbe5f69 100644
--- a/python/pyspark/mllib/common.py
+++ b/python/pyspark/mllib/common.py
@@ -96,7 +96,7 @@ def _java2py(sc, r):
 
         if clsName == 'JavaRDD':
             jrdd = sc._jvm.SerDe.javaToPython(r)
-            return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
+            return RDD(jrdd, sc)
 
         elif isinstance(r, (JavaArray, JavaList)) or clsName in _picklable_classes:
             r = sc._jvm.SerDe.dumps(r)

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 6b32af0..e8b9984 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -117,7 +117,7 @@ def _test():
     import doctest
     import pyspark.mllib.recommendation
     globs = pyspark.mllib.recommendation.__dict__.copy()
-    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    globs['sc'] = SparkContext('local[4]', 'PythonTest')
     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()
     if failure_count:

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 4f025b9..879655d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -120,7 +120,7 @@ class RDD(object):
     operated on in parallel.
     """
 
-    def __init__(self, jrdd, ctx, jrdd_deserializer):
+    def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):
         self._jrdd = jrdd
         self.is_cached = False
         self.is_checkpointed = False
@@ -129,12 +129,8 @@ class RDD(object):
         self._id = jrdd.id()
         self._partitionFunc = None
 
-    def _toPickleSerialization(self):
-        if (self._jrdd_deserializer == PickleSerializer() or
-                self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
-            return self
-        else:
-            return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+    def _pickled(self):
+        return self._reserialize(AutoBatchedSerializer(PickleSerializer()))
 
     def id(self):
         """
@@ -446,12 +442,11 @@ class RDD(object):
 
     def _reserialize(self, serializer=None):
         serializer = serializer or self.ctx.serializer
-        if self._jrdd_deserializer == serializer:
-            return self
-        else:
-            converted = self.map(lambda x: x, preservesPartitioning=True)
-            converted._jrdd_deserializer = serializer
-            return converted
+        if self._jrdd_deserializer != serializer:
+            if not isinstance(self, PipelinedRDD):
+                self = self.map(lambda x: x, preservesPartitioning=True)
+            self._jrdd_deserializer = serializer
+        return self
 
     def __add__(self, other):
         """
@@ -1120,9 +1115,8 @@ class RDD(object):
         :param valueConverter: (None by default)
         """
         jconf = self.ctx._dictToJavaMap(conf)
-        pickledRDD = self._toPickleSerialization()
-        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
-        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+        pickledRDD = self._pickled()
+        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
                                                     keyConverter, valueConverter, True)
 
     def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
@@ -1147,9 +1141,8 @@ class RDD(object):
         :param conf: Hadoop job configuration, passed in as a dict (None by default)
         """
         jconf = self.ctx._dictToJavaMap(conf)
-        pickledRDD = self._toPickleSerialization()
-        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
-        self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+        pickledRDD = self._pickled()
+        self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, True, path,
                                                        outputFormatClass,
                                                        keyClass, valueClass,
                                                        keyConverter, valueConverter, jconf)
@@ -1166,9 +1159,8 @@ class RDD(object):
         :param valueConverter: (None by default)
         """
         jconf = self.ctx._dictToJavaMap(conf)
-        pickledRDD = self._toPickleSerialization()
-        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
-        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+        pickledRDD = self._pickled()
+        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
                                                     keyConverter, valueConverter, False)
 
     def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
@@ -1195,9 +1187,8 @@ class RDD(object):
         :param compressionCodecClass: (None by default)
         """
         jconf = self.ctx._dictToJavaMap(conf)
-        pickledRDD = self._toPickleSerialization()
-        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
-        self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+        pickledRDD = self._pickled()
+        self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, True, path,
                                                  outputFormatClass,
                                                  keyClass, valueClass,
                                                  keyConverter, valueConverter,
@@ -1215,9 +1206,8 @@ class RDD(object):
         :param path: path to sequence file
         :param compressionCodecClass: (None by default)
         """
-        pickledRDD = self._toPickleSerialization()
-        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
-        self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+        pickledRDD = self._pickled()
+        self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, True,
                                                    path, compressionCodecClass)
 
     def saveAsPickleFile(self, path, batchSize=10):
@@ -1232,8 +1222,11 @@ class RDD(object):
         >>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
         [1, 2, 'rdd', 'spark']
         """
-        self._reserialize(BatchedSerializer(PickleSerializer(),
-                                            batchSize))._jrdd.saveAsObjectFile(path)
+        if batchSize == 0:
+            ser = AutoBatchedSerializer(PickleSerializer())
+        else:
+            ser = BatchedSerializer(PickleSerializer(), batchSize)
+        self._reserialize(ser)._jrdd.saveAsObjectFile(path)
 
     def saveAsTextFile(self, path):
         """
@@ -1774,13 +1767,10 @@ class RDD(object):
         >>> x.zip(y).collect()
         [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
         """
-        if self.getNumPartitions() != other.getNumPartitions():
-            raise ValueError("Can only zip with RDD which has the same number of partitions")
-
         def get_batch_size(ser):
             if isinstance(ser, BatchedSerializer):
                 return ser.batchSize
-            return 0
+            return 1
 
         def batch_as(rdd, batchSize):
             ser = rdd._jrdd_deserializer
@@ -1790,12 +1780,16 @@ class RDD(object):
 
         my_batch = get_batch_size(self._jrdd_deserializer)
         other_batch = get_batch_size(other._jrdd_deserializer)
-        if my_batch != other_batch:
-            # use the greatest batchSize to batch the other one.
-            if my_batch > other_batch:
-                other = batch_as(other, my_batch)
-            else:
-                self = batch_as(self, other_batch)
+        # use the smallest batchSize for both of them
+        batchSize = min(my_batch, other_batch)
+        if batchSize <= 0:
+            # auto batched or unlimited
+            batchSize = 100
+        other = batch_as(other, batchSize)
+        self = batch_as(self, batchSize)
+
+        if self.getNumPartitions() != other.getNumPartitions():
+            raise ValueError("Can only zip with RDD which has the same number of partitions")
 
         # There will be an Exception in JVM if there are different number
         # of items in each partitions.
@@ -1934,25 +1928,14 @@ class RDD(object):
 
         return values.collect()
 
-    def _is_pickled(self):
-        """ Return this RDD is serialized by Pickle or not. """
-        der = self._jrdd_deserializer
-        if isinstance(der, PickleSerializer):
-            return True
-        if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
-            return True
-        return False
-
     def _to_java_object_rdd(self):
         """ Return an JavaRDD of Object by unpickling
 
         It will convert each Python object into Java object by Pyrolite, whenever the
         RDD is serialized in batch or not.
         """
-        rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer())) \
-            if not self._is_pickled() else self
-        is_batch = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
-        return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, is_batch)
+        rdd = self._pickled()
+        return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
 
     def countApprox(self, timeout, confidence=0.95):
         """
@@ -2132,7 +2115,7 @@ def _test():
     globs = globals().copy()
     # The small batch size here ensures that we see multiple batches,
     # even in these small test examples:
-    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    globs['sc'] = SparkContext('local[4]', 'PythonTest')
     (failure_count, test_count) = doctest.testmod(
         globs=globs, optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 904bd9f..d597cbf 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -33,9 +33,8 @@ The serializer is chosen when creating L{SparkContext}:
 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
 >>> sc.stop()
 
-By default, PySpark serialize objects in batches; the batch size can be
-controlled through SparkContext's C{batchSize} parameter
-(the default size is 1024 objects):
+PySpark serialize objects in batches; By default, the batch size is chosen based
+on the size of objects, also configurable by SparkContext's C{batchSize} parameter:
 
 >>> sc = SparkContext('local', 'test', batchSize=2)
 >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
@@ -48,16 +47,6 @@ which contains two batches of two objects:
 >>> rdd._jrdd.count()
 8L
 >>> sc.stop()
-
-A batch size of -1 uses an unlimited batch size, and a size of 1 disables
-batching:
-
->>> sc = SparkContext('local', 'test', batchSize=1)
->>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
->>> rdd.glom().collect()
-[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
->>> rdd._jrdd.count()
-16L
 """
 
 import cPickle
@@ -73,7 +62,7 @@ import itertools
 from pyspark import cloudpickle
 
 
-__all__ = ["PickleSerializer", "MarshalSerializer"]
+__all__ = ["PickleSerializer", "MarshalSerializer", "UTF8Deserializer"]
 
 
 class SpecialLengths(object):
@@ -113,7 +102,7 @@ class Serializer(object):
         return not self.__eq__(other)
 
     def __repr__(self):
-        return "<%s object>" % self.__class__.__name__
+        return "%s()" % self.__class__.__name__
 
     def __hash__(self):
         return hash(str(self))
@@ -181,6 +170,7 @@ class BatchedSerializer(Serializer):
     """
 
     UNLIMITED_BATCH_SIZE = -1
+    UNKNOWN_BATCH_SIZE = 0
 
     def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
         self.serializer = serializer
@@ -213,10 +203,10 @@ class BatchedSerializer(Serializer):
 
     def __eq__(self, other):
         return (isinstance(other, BatchedSerializer) and
-                other.serializer == self.serializer)
+                other.serializer == self.serializer and other.batchSize == self.batchSize)
 
     def __repr__(self):
-        return "BatchedSerializer<%s>" % str(self.serializer)
+        return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
 
 
 class AutoBatchedSerializer(BatchedSerializer):
@@ -225,7 +215,7 @@ class AutoBatchedSerializer(BatchedSerializer):
     """
 
     def __init__(self, serializer, bestSize=1 << 16):
-        BatchedSerializer.__init__(self, serializer, -1)
+        BatchedSerializer.__init__(self, serializer, self.UNKNOWN_BATCH_SIZE)
         self.bestSize = bestSize
 
     def dump_stream(self, iterator, stream):
@@ -248,10 +238,10 @@ class AutoBatchedSerializer(BatchedSerializer):
 
     def __eq__(self, other):
         return (isinstance(other, AutoBatchedSerializer) and
-                other.serializer == self.serializer)
+                other.serializer == self.serializer and other.bestSize == self.bestSize)
 
     def __str__(self):
-        return "AutoBatchedSerializer<%s>" % str(self.serializer)
+        return "AutoBatchedSerializer(%s)" % str(self.serializer)
 
 
 class CartesianDeserializer(FramedSerializer):
@@ -284,7 +274,7 @@ class CartesianDeserializer(FramedSerializer):
                 self.key_ser == other.key_ser and self.val_ser == other.val_ser)
 
     def __repr__(self):
-        return "CartesianDeserializer<%s, %s>" % \
+        return "CartesianDeserializer(%s, %s)" % \
                (str(self.key_ser), str(self.val_ser))
 
 
@@ -311,7 +301,7 @@ class PairDeserializer(CartesianDeserializer):
                 self.key_ser == other.key_ser and self.val_ser == other.val_ser)
 
     def __repr__(self):
-        return "PairDeserializer<%s, %s>" % (str(self.key_ser), str(self.val_ser))
+        return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
 
 
 class NoOpSerializer(FramedSerializer):
@@ -430,7 +420,7 @@ class MarshalSerializer(FramedSerializer):
 class AutoSerializer(FramedSerializer):
 
     """
-    Choose marshal or cPickle as serialization protocol autumatically
+    Choose marshal or cPickle as serialization protocol automatically
     """
 
     def __init__(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index d57a802..5931e92 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -25,7 +25,7 @@ import itertools
 import random
 
 import pyspark.heapq3 as heapq
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
 
 try:
     import psutil
@@ -213,8 +213,7 @@ class ExternalMerger(Merger):
         Merger.__init__(self, aggregator)
         self.memory_limit = memory_limit
         # default serializer is only used for tests
-        self.serializer = serializer or \
-            BatchedSerializer(PickleSerializer(), 1024)
+        self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
         self.localdirs = localdirs or _get_local_dirs(str(id(self)))
         # number of partitions when spill data into disks
         self.partitions = partitions
@@ -470,7 +469,7 @@ class ExternalSorter(object):
     def __init__(self, memory_limit, serializer=None):
         self.memory_limit = memory_limit
         self.local_dirs = _get_local_dirs("sort")
-        self.serializer = serializer or BatchedSerializer(PickleSerializer(), 1024)
+        self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
 
     def _get_path(self, n):
         """ Choose one directory for spill by number n """

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index d16c18b..e5d62a4 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -44,7 +44,8 @@ from py4j.protocol import Py4JError
 from py4j.java_collections import ListConverter, MapConverter
 
 from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
+from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \
+    CloudPickleSerializer
 from pyspark.storagelevel import StorageLevel
 from pyspark.traceback_utils import SCCallSiteSync
 
@@ -1233,7 +1234,6 @@ class SQLContext(object):
         self._sc = sparkContext
         self._jsc = self._sc._jsc
         self._jvm = self._sc._jvm
-        self._pythonToJava = self._jvm.PythonRDD.pythonToJavaArray
         self._scala_SQLContext = sqlContext
 
     @property
@@ -1263,8 +1263,8 @@ class SQLContext(object):
         """
         func = lambda _, it: imap(lambda x: f(*x), it)
         command = (func, None,
-                   BatchedSerializer(PickleSerializer(), 1024),
-                   BatchedSerializer(PickleSerializer(), 1024))
+                   AutoBatchedSerializer(PickleSerializer()),
+                   AutoBatchedSerializer(PickleSerializer()))
         ser = CloudPickleSerializer()
         pickled_command = ser.dumps(command)
         if len(pickled_command) > (1 << 20):  # 1M
@@ -1443,8 +1443,7 @@ class SQLContext(object):
         converter = _python_to_sql_converter(schema)
         rdd = rdd.map(converter)
 
-        batched = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
-        jrdd = self._pythonToJava(rdd._jrdd, batched)
+        jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
         srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
         return SchemaRDD(srdd.toJavaSchemaRDD(), self)
 
@@ -1841,7 +1840,7 @@ class SchemaRDD(RDD):
         self.is_checkpointed = False
         self.ctx = self.sql_ctx._sc
         # the _jrdd is created by javaToPython(), serialized by pickle
-        self._jrdd_deserializer = BatchedSerializer(PickleSerializer())
+        self._jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
 
     @property
     def _jrdd(self):
@@ -2071,16 +2070,13 @@ class SchemaRDD(RDD):
 
 def _test():
     import doctest
-    from array import array
     from pyspark.context import SparkContext
     # let doctest run in pyspark.sql, so DataTypes can be picklable
     import pyspark.sql
     from pyspark.sql import Row, SQLContext
     from pyspark.tests import ExamplePoint, ExamplePointUDT
     globs = pyspark.sql.__dict__.copy()
-    # The small batch size here ensures that we see multiple batches,
-    # even in these small test examples:
-    sc = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    sc = SparkContext('local[4]', 'PythonTest')
     globs['sc'] = sc
     globs['sqlCtx'] = SQLContext(sc)
     globs['rdd'] = sc.parallelize(

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index e947b09..7e61b01 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -242,7 +242,7 @@ class PySparkTestCase(unittest.TestCase):
     def setUp(self):
         self._old_sys_path = list(sys.path)
         class_name = self.__class__.__name__
-        self.sc = SparkContext('local[4]', class_name, batchSize=2)
+        self.sc = SparkContext('local[4]', class_name)
 
     def tearDown(self):
         self.sc.stop()
@@ -253,7 +253,7 @@ class ReusedPySparkTestCase(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.sc = SparkContext('local[4]', cls.__name__, batchSize=2)
+        cls.sc = SparkContext('local[4]', cls.__name__)
 
     @classmethod
     def tearDownClass(cls):
@@ -671,7 +671,7 @@ class ProfilerTests(PySparkTestCase):
         self._old_sys_path = list(sys.path)
         class_name = self.__class__.__name__
         conf = SparkConf().set("spark.python.profile", "true")
-        self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf)
+        self.sc = SparkContext('local[4]', class_name, conf=conf)
 
     def test_profiler(self):
 
@@ -1012,16 +1012,19 @@ class InputFormatTests(ReusedPySparkTestCase):
         clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
                                             "org.apache.hadoop.io.Text",
                                             "org.apache.spark.api.python.TestWritable").collect())
-        ec = (u'1',
-              {u'__class__': u'org.apache.spark.api.python.TestWritable',
-               u'double': 54.0, u'int': 123, u'str': u'test1'})
-        self.assertEqual(clazz[0], ec)
+        cname = u'org.apache.spark.api.python.TestWritable'
+        ec = [(u'1', {u'__class__': cname, u'double': 1.0, u'int': 1, u'str': u'test1'}),
+              (u'2', {u'__class__': cname, u'double': 2.3, u'int': 2, u'str': u'test2'}),
+              (u'3', {u'__class__': cname, u'double': 3.1, u'int': 3, u'str': u'test3'}),
+              (u'4', {u'__class__': cname, u'double': 4.2, u'int': 4, u'str': u'test4'}),
+              (u'5', {u'__class__': cname, u'double': 5.5, u'int': 5, u'str': u'test56'})]
+        self.assertEqual(clazz, ec)
 
         unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
                                                       "org.apache.hadoop.io.Text",
                                                       "org.apache.spark.api.python.TestWritable",
-                                                      batchSize=1).collect())
-        self.assertEqual(unbatched_clazz[0], ec)
+                                                      ).collect())
+        self.assertEqual(unbatched_clazz, ec)
 
     def test_oldhadoop(self):
         basepath = self.tempdir.name
@@ -1341,51 +1344,6 @@ class OutputFormatTests(ReusedPySparkTestCase):
         result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
         self.assertEqual(result5, data)
 
-    def test_unbatched_save_and_read(self):
-        basepath = self.tempdir.name
-        ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
-        self.sc.parallelize(ei, len(ei)).saveAsSequenceFile(
-            basepath + "/unbatched/")
-
-        unbatched_sequence = sorted(self.sc.sequenceFile(
-            basepath + "/unbatched/",
-            batchSize=1).collect())
-        self.assertEqual(unbatched_sequence, ei)
-
-        unbatched_hadoopFile = sorted(self.sc.hadoopFile(
-            basepath + "/unbatched/",
-            "org.apache.hadoop.mapred.SequenceFileInputFormat",
-            "org.apache.hadoop.io.IntWritable",
-            "org.apache.hadoop.io.Text",
-            batchSize=1).collect())
-        self.assertEqual(unbatched_hadoopFile, ei)
-
-        unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(
-            basepath + "/unbatched/",
-            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
-            "org.apache.hadoop.io.IntWritable",
-            "org.apache.hadoop.io.Text",
-            batchSize=1).collect())
-        self.assertEqual(unbatched_newAPIHadoopFile, ei)
-
-        oldconf = {"mapred.input.dir": basepath + "/unbatched/"}
-        unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
-            "org.apache.hadoop.mapred.SequenceFileInputFormat",
-            "org.apache.hadoop.io.IntWritable",
-            "org.apache.hadoop.io.Text",
-            conf=oldconf,
-            batchSize=1).collect())
-        self.assertEqual(unbatched_hadoopRDD, ei)
-
-        newconf = {"mapred.input.dir": basepath + "/unbatched/"}
-        unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
-            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
-            "org.apache.hadoop.io.IntWritable",
-            "org.apache.hadoop.io.Text",
-            conf=newconf,
-            batchSize=1).collect())
-        self.assertEqual(unbatched_newAPIHadoopRDD, ei)
-
     def test_malformed_RDD(self):
         basepath = self.tempdir.name
         # non-batch-serialized RDD[[(K, V)]] should be rejected

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 3ee2ea0..fbec2f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
 
 import java.util.{List => JList}
 
+import org.apache.spark.api.python.SerDeUtil
+
 import scala.collection.JavaConversions._
 
 import net.razorvine.pickle.Pickler
@@ -385,12 +387,8 @@ class SchemaRDD(
    */
   private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
     val fieldTypes = schema.fields.map(_.dataType)
-    this.mapPartitions { iter =>
-      val pickle = new Pickler
-      iter.map { row =>
-        EvaluatePython.rowToArray(row, fieldTypes)
-      }.grouped(100).map(batched => pickle.dumps(batched.toArray))
-    }
+    val jrdd = this.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
+    SerDeUtil.javaToPython(jrdd)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org