You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/27 02:37:11 UTC

git commit: [SPARK-2601] [PySpark] Fix Py4J error when transforming pickleFiles

Repository: spark
Updated Branches:
  refs/heads/master 12901643b -> ba46bbed5


[SPARK-2601] [PySpark] Fix Py4J error when transforming pickleFiles

Similar to SPARK-1034, the problem was that Py4J didn’t cope well with the fake ClassTags used in the Java API.  It doesn’t look like there’s any reason why PythonRDD needs to take a ClassTag, since it just ignores the type of the previous RDD, so I removed the type parameter and we no longer pass ClassTags from Python.

Author: Josh Rosen <jo...@apache.org>

Closes #1605 from JoshRosen/spark-2601 and squashes the following commits:

b68e118 [Josh Rosen] Fix Py4J error when transforming pickleFiles [SPARK-2601]


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba46bbed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba46bbed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba46bbed

Branch: refs/heads/master
Commit: ba46bbed5d32aec0f11f0b71c82bba8dbe19f05a
Parents: 1290164
Author: Josh Rosen <jo...@apache.org>
Authored: Sat Jul 26 17:37:05 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sat Jul 26 17:37:05 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala  | 4 ++--
 python/pyspark/rdd.py                                       | 4 +---
 python/pyspark/tests.py                                     | 9 +++++++++
 3 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba46bbed/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index d6b0988..d87783e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -37,8 +37,8 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
 
-private[spark] class PythonRDD[T: ClassTag](
-    parent: RDD[T],
+private[spark] class PythonRDD(
+    parent: RDD[_],
     command: Array[Byte],
     envVars: JMap[String, String],
     pythonIncludes: JList[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/ba46bbed/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 113a082..b84d976 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1687,7 +1687,6 @@ class PipelinedRDD(RDD):
             [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
             self.ctx._gateway._gateway_client)
         self.ctx._pickled_broadcast_vars.clear()
-        class_tag = self._prev_jrdd.classTag()
         env = MapConverter().convert(self.ctx.environment,
                                      self.ctx._gateway._gateway_client)
         includes = ListConverter().convert(self.ctx._python_includes,
@@ -1696,8 +1695,7 @@ class PipelinedRDD(RDD):
                                              bytearray(pickled_command),
                                              env, includes, self.preservesPartitioning,
                                              self.ctx.pythonExec,
-                                             broadcast_vars, self.ctx._javaAccumulator,
-                                             class_tag)
+                                             broadcast_vars, self.ctx._javaAccumulator)
         self._jrdd_val = python_rdd.asJavaRDD()
         return self._jrdd_val
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba46bbed/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index a92abbf..8ba5146 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -226,6 +226,15 @@ class TestRDDFunctions(PySparkTestCase):
         cart = rdd1.cartesian(rdd2)
         result = cart.map(lambda (x, y): x + y).collect()
 
+    def test_transforming_pickle_file(self):
+        # Regression test for SPARK-2601
+        data = self.sc.parallelize(["Hello", "World!"])
+        tempFile = tempfile.NamedTemporaryFile(delete=True)
+        tempFile.close()
+        data.saveAsPickleFile(tempFile.name)
+        pickled_file = self.sc.pickleFile(tempFile.name)
+        pickled_file.map(lambda x: x).collect()
+
     def test_cartesian_on_textfile(self):
         # Regression test for
         path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")