You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/14 22:17:26 UTC

git commit: [SPARK-2079] Support batching when serializing SchemaRDD to Python

Repository: spark
Updated Branches:
  refs/heads/master 891968509 -> 2550533a2


[SPARK-2079] Support batching when serializing SchemaRDD to Python

Added batching with default batch size 10 in SchemaRDD.javaToPython

Author: Kan Zhang <kz...@apache.org>

Closes #1023 from kanzhang/SPARK-2079 and squashes the following commits:

2d1915e [Kan Zhang] [SPARK-2079] Add batching in SchemaRDD.javaToPython
19b0c09 [Kan Zhang] [SPARK-2079] Removing unnecessary wrapping in SchemaRDD.javaToPython


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2550533a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2550533a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2550533a

Branch: refs/heads/master
Commit: 2550533a28382664f8fd294b2caa494d12bfc7c1
Parents: 8919685
Author: Kan Zhang <kz...@apache.org>
Authored: Sat Jun 14 13:17:22 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Jun 14 13:17:22 2014 -0700

----------------------------------------------------------------------
 python/pyspark/sql.py                                       | 4 +++-
 .../src/main/scala/org/apache/spark/sql/SchemaRDD.scala     | 9 ++-------
 2 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2550533a/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 960d0a8..e344610 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -16,6 +16,7 @@
 #
 
 from pyspark.rdd import RDD
+from pyspark.serializers import BatchedSerializer, PickleSerializer
 
 from py4j.protocol import Py4JError
 
@@ -346,7 +347,8 @@ class SchemaRDD(RDD):
         # TODO: This is inefficient, we should construct the Python Row object
         # in Java land in the javaToPython function. May require a custom
         # pickle serializer in Pyrolite
-        return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d))
+        return RDD(jrdd, self._sc, BatchedSerializer(
+                        PickleSerializer())).map(lambda d: Row(d))
 
     # We override the default cache/persist/checkpoint behavior as we want to cache the underlying
     # SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class

http://git-wip-us.apache.org/repos/asf/spark/blob/2550533a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 821ac85..89eaba2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -347,16 +347,11 @@ class SchemaRDD(
       val pickle = new Pickler
       iter.map { row =>
         val map: JMap[String, Any] = new java.util.HashMap
-        // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict].
-        // Ideally we should be able to pickle an object directly into a Python collection so we
-        // don't have to create an ArrayList every time.
-        val arr: java.util.ArrayList[Any] = new java.util.ArrayList
         row.zip(fieldNames).foreach { case (obj, name) =>
           map.put(name, obj)
         }
-        arr.add(map)
-        pickle.dumps(arr)
-      }
+        map
+      }.grouped(10).map(batched => pickle.dumps(batched.toArray))
     }
   }