You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/02/04 07:40:33 UTC

git commit: Merge pull request #511 from JoshRosen/SPARK-1040

Updated Branches:
  refs/heads/branch-0.9 2c6c9b9d3 -> 1280e8afd


Merge pull request #511 from JoshRosen/SPARK-1040

Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040)

This fixes [SPARK-1040](https://spark-project.atlassian.net/browse/SPARK-1040), an issue where JavaPairRDD.collectAsMap() could sometimes fail with ClassCastException.  I applied the same fix to the Spark Streaming Java APIs.  The commit message describes the fix in more detail.

I also increased the verbosity of JUnit test output under SBT to make it easier to verify that the Java tests are actually running.
(cherry picked from commit c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16)

Signed-off-by: Patrick Wendell <pw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1280e8af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1280e8af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1280e8af

Branch: refs/heads/branch-0.9
Commit: 1280e8afddac3f01ef5d44ab1bf9df4a672329a0
Parents: 2c6c9b9
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Jan 25 22:36:07 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Feb 3 22:40:29 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala    |  4 ++--
 .../test/scala/org/apache/spark/JavaAPISuite.java  | 17 +++++++++++++++++
 pom.xml                                            |  2 +-
 project/SparkBuild.scala                           |  3 ++-
 .../spark/streaming/api/java/JavaDStreamLike.scala |  4 ++--
 .../spark/streaming/api/java/JavaPairDStream.scala |  2 +-
 6 files changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1280e8af/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index ebbbbd8..0818ee4 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to all elements of this RDD.
    */
   def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
     new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
   }
 
@@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
     import scala.collection.JavaConverters._
     def fn = (x: T) => f.apply(x).asScala
-    def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
     JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1280e8af/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 23ec6c3..8c573ac 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -387,18 +387,21 @@ public class JavaAPISuite implements Serializable {
         return 1.0 * x;
       }
     }).cache();
+    doubles.collect();
     JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Integer x) {
         return new Tuple2<Integer, Integer>(x, x);
       }
     }).cache();
+    pairs.collect();
     JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
       @Override
       public String call(Integer x) {
         return x.toString();
       }
     }).cache();
+    strings.collect();
   }
 
   @Test
@@ -962,4 +965,18 @@ public class JavaAPISuite implements Serializable {
     }
 
   }
+
+  @Test
+  public void collectAsMapWithIntArrayValues() {
+    // Regression test for SPARK-1040
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+      @Override
+      public Tuple2<Integer, int[]> call(Integer x) throws Exception {
+        return new Tuple2<Integer, int[]>(x, new int[] { x });
+      }
+    });
+    pairRDD.collect();  // Works fine
+    Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1280e8af/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 150dba8..2a3f8fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -389,7 +389,7 @@
       <dependency>
         <groupId>com.novocode</groupId>
         <artifactId>junit-interface</artifactId>
-        <version>0.9</version>
+        <version>0.10</version>
         <scope>test</scope>
       </dependency>
       <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1280e8af/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b891ffa..788ef12 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -212,12 +212,13 @@ object SparkBuild extends Build {
         "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
         "org.scalatest"    %% "scalatest"       % "1.9.1"  % "test",
         "org.scalacheck"   %% "scalacheck"      % "1.10.0" % "test",
-        "com.novocode"      % "junit-interface" % "0.9"    % "test",
+        "com.novocode"      % "junit-interface" % "0.10"   % "test",
         "org.easymock"      % "easymock"        % "3.1"    % "test",
         "org.mockito"       % "mockito-all"     % "1.8.5"  % "test",
         "commons-io"        % "commons-io"      % "2.4"    % "test"
     ),
 
+    testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
     parallelExecution := true,
     /* Workaround for issue #206 (fixed after SBT 0.11.0) */
     watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1280e8af/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a493a82..64fe204 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
   def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
-    def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
     new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
   }
 
@@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
   def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
     import scala.collection.JavaConverters._
     def fn = (x: T) => f.apply(x).asScala
-    def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
     new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1280e8af/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 79fa6a6..62cfa0a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   override val classTag: ClassTag[(K, V)] =
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+    implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
 }
 
 object JavaPairDStream {