You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/24 04:08:47 UTC

[1/3] git commit: Fix SPARK-1034: Py4JException on PySpark Cartesian Result

Updated Branches:
  refs/heads/master fad6aacfb -> cad3002fe


Fix SPARK-1034: Py4JException on PySpark Cartesian Result

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0035dbbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0035dbbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0035dbbc

Branch: refs/heads/master
Commit: 0035dbbc8125af94ae27cb6f10e87aa6f5a078b1
Parents: a2b47da
Author: Josh Rosen <jo...@apache.org>
Authored: Thu Jan 23 13:05:59 2014 -0800
Committer: Josh Rosen <jo...@apache.org>
Committed: Thu Jan 23 13:05:59 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/java/JavaPairRDD.scala    | 3 +--
 python/pyspark/tests.py                                       | 7 +++++++
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0035dbbc/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 0fb7e19..f430a33 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -49,8 +49,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
 
   override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
 
-  override val classTag: ClassTag[(K, V)] =
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+  override val classTag: ClassTag[(K, V)] = rdd.elementClassTag
 
   import JavaPairRDD._
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0035dbbc/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 7acb6ea..05a9f7f 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -152,6 +152,13 @@ class TestRDDFunctions(PySparkTestCase):
         raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
         self.assertEqual(x, unicode(raw_contents.strip(), "utf-8"))
 
+    def test_transforming_cartesian_result(self):
+        # Regression test for SPARK-1034
+        rdd1 = self.sc.parallelize([1, 2])
+        rdd2 = self.sc.parallelize([3, 4])
+        cart = rdd1.cartesian(rdd2)
+        result = cart.map(lambda (x, y): x + y).collect()
+
 
 class TestIO(PySparkTestCase):
 


[3/3] git commit: Merge pull request #501 from JoshRosen/cartesian-rdd-fixes

Posted by pw...@apache.org.
Merge pull request #501 from JoshRosen/cartesian-rdd-fixes

Fix two bugs in PySpark cartesian(): SPARK-978 and SPARK-1034

This pull request fixes two bugs in PySpark's `cartesian()` method:

- [SPARK-978](https://spark-project.atlassian.net/browse/SPARK-978): PySpark's cartesian method throws ClassCastException exception
- [SPARK-1034](https://spark-project.atlassian.net/browse/SPARK-1034): Py4JException on PySpark Cartesian Result

The JIRAs have more details describing the fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/cad3002f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/cad3002f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/cad3002f

Branch: refs/heads/master
Commit: cad3002fead89d3c9a8de4fa989e88f367bc0b05
Parents: fad6aac 6156990
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 23 19:08:34 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 23 19:08:34 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaPairRDD.scala |  3 +-
 .../org/apache/spark/api/python/PythonRDD.scala | 59 +++++++++++++-------
 python/pyspark/tests.py                         | 16 ++++++
 3 files changed, 56 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: Fix SPARK-978: ClassCastException in PySpark cartesian.

Posted by pw...@apache.org.
Fix SPARK-978: ClassCastException in PySpark cartesian.

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/61569906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/61569906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/61569906

Branch: refs/heads/master
Commit: 61569906ccafe4f1d10a61882d564e4bb16665ef
Parents: 0035dbb
Author: Josh Rosen <jo...@apache.org>
Authored: Thu Jan 23 15:09:19 2014 -0800
Committer: Josh Rosen <jo...@apache.org>
Committed: Thu Jan 23 15:09:19 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala | 59 +++++++++++++-------
 python/pyspark/tests.py                         |  9 +++
 2 files changed, 48 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61569906/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 82527fe..57bde8d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -78,9 +78,7 @@ private[spark] class PythonRDD[T: ClassTag](
           dataOut.writeInt(command.length)
           dataOut.write(command)
           // Data values
-          for (elem <- parent.iterator(split, context)) {
-            PythonRDD.writeToStream(elem, dataOut)
-          }
+          PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
           dataOut.flush()
           worker.shutdownOutput()
         } catch {
@@ -206,20 +204,43 @@ private[spark] object PythonRDD {
     JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
   }
 
-  def writeToStream(elem: Any, dataOut: DataOutputStream) {
-    elem match {
-      case bytes: Array[Byte] =>
-        dataOut.writeInt(bytes.length)
-        dataOut.write(bytes)
-      case pair: (Array[Byte], Array[Byte]) =>
-        dataOut.writeInt(pair._1.length)
-        dataOut.write(pair._1)
-        dataOut.writeInt(pair._2.length)
-        dataOut.write(pair._2)
-      case str: String =>
-        dataOut.writeUTF(str)
-      case other =>
-        throw new SparkException("Unexpected element type " + other.getClass)
+  def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
+    // The right way to implement this would be to use TypeTags to get the full
+    // type of T.  Since I don't want to introduce breaking changes throughout the
+    // entire Spark API, I have to use this hacky approach:
+    if (iter.hasNext) {
+      val first = iter.next()
+      val newIter = Seq(first).iterator ++ iter
+      first match {
+        case arr: Array[Byte] =>
+          newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes =>
+            dataOut.writeInt(bytes.length)
+            dataOut.write(bytes)
+          }
+        case string: String =>
+          newIter.asInstanceOf[Iterator[String]].foreach { str =>
+            dataOut.writeUTF(str)
+          }
+        case pair: Tuple2[_, _] =>
+          pair._1 match {
+            case bytePair: Array[Byte] =>
+              newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
+                dataOut.writeInt(pair._1.length)
+                dataOut.write(pair._1)
+                dataOut.writeInt(pair._2.length)
+                dataOut.write(pair._2)
+              }
+            case stringPair: String =>
+              newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
+                dataOut.writeUTF(pair._1)
+                dataOut.writeUTF(pair._2)
+              }
+            case other =>
+              throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
+          }
+        case other =>
+          throw new SparkException("Unexpected element type " + first.getClass)
+      }
     }
   }
 
@@ -230,9 +251,7 @@ private[spark] object PythonRDD {
 
   def writeToFile[T](items: Iterator[T], filename: String) {
     val file = new DataOutputStream(new FileOutputStream(filename))
-    for (item <- items) {
-      writeToStream(item, file)
-    }
+    writeIteratorToStream(items, file)
     file.close()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61569906/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 05a9f7f..acd1ca5 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -159,6 +159,15 @@ class TestRDDFunctions(PySparkTestCase):
         cart = rdd1.cartesian(rdd2)
         result = cart.map(lambda (x, y): x + y).collect()
 
+    def test_cartesian_on_textfile(self):
+        # Regression test for
+        path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+        a = self.sc.textFile(path)
+        result = a.cartesian(a).collect()
+        (x, y) = result[0]
+        self.assertEqual("Hello World!", x.strip())
+        self.assertEqual("Hello World!", y.strip())
+
 
 class TestIO(PySparkTestCase):