You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/02/27 20:12:32 UTC

git commit: SPARK 1084.1 (resubmitted)

Repository: spark
Updated Branches:
  refs/heads/master aace2c097 -> 12bbca206


SPARK 1084.1 (resubmitted)

(Ported from https://github.com/apache/incubator-spark/pull/637 )

Author: Sean Owen <so...@cloudera.com>

Closes #31 from srowen/SPARK-1084.1 and squashes the following commits:

6c4a32c [Sean Owen] Suppress warnings about legitimate unchecked array creations, or change code to avoid it
f35b833 [Sean Owen] Fix two misc javadoc problems
254e8ef [Sean Owen] Fix one new style error introduced in scaladoc warning commit
5b2fce2 [Sean Owen] Fix scaladoc invocation warning, and enable javac warnings properly, with plugin config updates
007762b [Sean Owen] Remove dead scaladoc links
b8ff8cb [Sean Owen] Replace deprecated Ant <tasks> with <target>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12bbca20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12bbca20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12bbca20

Branch: refs/heads/master
Commit: 12bbca20657c17d5ebfceaacb37dddc851772675
Parents: aace2c0
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Feb 27 11:12:21 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Feb 27 11:12:21 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/bagel/Bagel.scala    |  14 +--
 core/pom.xml                                    |   4 +-
 .../scala/org/apache/spark/SparkContext.scala   |   2 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |   2 +-
 .../spark/util/IndestructibleActorSystem.scala  |   4 +-
 .../org/apache/spark/util/StatCounter.scala     |   6 +-
 .../scala/org/apache/spark/util/Vector.scala    |   2 +-
 .../java/org/apache/spark/JavaAPISuite.java     |  35 ++++--
 pom.xml                                         |   5 +-
 repl/pom.xml                                    |   4 +-
 .../streaming/api/java/JavaPairDStream.scala    |  16 +--
 .../api/java/JavaStreamingContext.scala         |   4 +-
 .../dstream/PairDStreamFunctions.scala          |  16 +--
 .../apache/spark/streaming/JavaAPISuite.java    | 124 +++++++++++++------
 yarn/pom.xml                                    |   4 +-
 15 files changed, 154 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
----------------------------------------------------------------------
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index dd3eed8..70c7474 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -27,7 +27,7 @@ object Bagel extends Logging {
 
   /**
    * Runs a Bagel program.
-   * @param sc [[org.apache.spark.SparkContext]] to use for the program.
+   * @param sc org.apache.spark.SparkContext to use for the program.
    * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
    *                 Key will be the vertex id.
    * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
@@ -38,10 +38,10 @@ object Bagel extends Logging {
    * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
    *                  after each superstep and provides the result to each vertex in the next
    *                  superstep.
-   * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
+   * @param partitioner org.apache.spark.Partitioner partitions values by key
    * @param numPartitions number of partitions across which to split the graph.
    *                      Default is the default parallelism of the SparkContext
-   * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
+   * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of
    *                    intermediate RDDs in each superstep. Defaults to caching in memory.
    * @param compute function that takes a Vertex, optional set of (possibly combined) messages to
    *                the Vertex, optional Aggregator and the current superstep,
@@ -131,7 +131,7 @@ object Bagel extends Logging {
 
   /**
    * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
-   * [[org.apache.spark.HashPartitioner]] and default storage level
+   * org.apache.spark.HashPartitioner and default storage level
    */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
     sc: SparkContext,
@@ -146,7 +146,7 @@ object Bagel extends Logging {
 
   /**
    * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
-   * default [[org.apache.spark.HashPartitioner]]
+   * default org.apache.spark.HashPartitioner
    */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
     sc: SparkContext,
@@ -166,7 +166,7 @@ object Bagel extends Logging {
 
   /**
    * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
-   * default [[org.apache.spark.HashPartitioner]],
+   * default org.apache.spark.HashPartitioner,
    * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
    */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -180,7 +180,7 @@ object Bagel extends Logging {
 
   /**
    * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
-   * the default [[org.apache.spark.HashPartitioner]]
+   * the default org.apache.spark.HashPartitioner
    * and [[org.apache.spark.bagel.DefaultCombiner]]
    */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index ebc178a..a333bff 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -225,7 +225,7 @@
                         </goals>
                         <configuration>
                             <exportAntProperties>true</exportAntProperties>
-                            <tasks>
+                            <target>
                                 <property name="spark.classpath" refid="maven.test.classpath" />
                                 <property environment="env" />
                                 <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
@@ -238,7 +238,7 @@
                                         </not>
                                     </condition>
                                 </fail>
-                            </tasks>
+                            </target>
                         </configuration>
                     </execution>
                 </executions>

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1f5334f..da778aa 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -351,7 +351,7 @@ class SparkContext(
    * using the older MapReduce API (`org.apache.hadoop.mapred`).
    *
    * @param conf JobConf for setting up the dataset
-   * @param inputFormatClass Class of the [[InputFormat]]
+   * @param inputFormatClass Class of the InputFormat
    * @param keyClass Class of the keys
    * @param valueClass Class of the values
    * @param minSplits Minimum number of Hadoop Splits to generate.

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 9d75d7c..006e2a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -81,7 +81,7 @@ class JobLogger(val user: String, val logDirName: String)
   /**
    * Create a log file for one job
    * @param jobID ID of the job
-   * @exception FileNotFoundException Fail to create log file
+   * @throws FileNotFoundException Fail to create log file
    */
   protected def createLogWriter(jobID: Int) {
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
index bf71882..c539d2f 100644
--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal}
 import com.typesafe.config.Config
 
 /**
- * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
+ * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
  * This is necessary as Spark Executors are allowed to recover from fatal exceptions
- * (see [[org.apache.spark.executor.Executor]]).
+ * (see org.apache.spark.executor.Executor)
  */
 object IndestructibleActorSystem {
   def apply(name: String, config: Config): ActorSystem =

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/core/src/main/scala/org/apache/spark/util/StatCounter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 5b0d2c3..f837dc7 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -19,9 +19,9 @@ package org.apache.spark.util
 
 /**
  * A class for tracking the statistics of a set of numbers (count, mean and variance) in a
- * numerically robust way. Includes support for merging two StatCounters. Based on 
- * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
- * Welford and Chan's algorithms for running variance]].
+ * numerically robust way. Includes support for merging two StatCounters. Based on Welford
+ * and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]]
+ * for running variance.
  *
  * @constructor Initialize the StatCounter with the given values.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index d437c05..dc4b8f2 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -136,7 +136,7 @@ object Vector {
 
   /**
    * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers 
-   * between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
+   * between 0.0 and 1.0. Optional scala.util.Random number generator can be provided.
    */
   def random(length: Int, random: Random = new XORShiftRandom()) =
     Vector(length, _ => random.nextDouble())

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 20232e9..aa5079c 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -75,8 +75,9 @@ public class JavaAPISuite implements Serializable {
       else if (a < b) return 1;
       else return 0;
     }
-  };
+  }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void sparkContextUnion() {
     // Union of non-specialized JavaRDDs
@@ -148,6 +149,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(2, foreachCalls);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void lookup() {
     JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
@@ -179,6 +181,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void cogroup() {
     JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
@@ -197,6 +200,7 @@ public class JavaAPISuite implements Serializable {
     cogrouped.collect();
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void leftOuterJoin() {
     JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
@@ -243,6 +247,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(33, sum);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void foldByKey() {
     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
@@ -265,6 +270,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void reduceByKey() {
     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
@@ -320,8 +326,8 @@ public class JavaAPISuite implements Serializable {
   public void take() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
     Assert.assertEquals(1, rdd.first().intValue());
-    List<Integer> firstTwo = rdd.take(2);
-    List<Integer> sample = rdd.takeSample(false, 2, 42);
+    rdd.take(2);
+    rdd.takeSample(false, 2, 42);
   }
 
   @Test
@@ -359,8 +365,8 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
     Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
 
-    Double first = rdd.first();
-    List<Double> take = rdd.take(5);
+    rdd.first();
+    rdd.take(5);
   }
 
   @Test
@@ -438,11 +444,11 @@ public class JavaAPISuite implements Serializable {
         return lengths;
       }
     });
-    Double x = doubles.first();
-    Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+    Assert.assertEquals(5.0, doubles.first(), 0.01);
     Assert.assertEquals(11, pairs.count());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void mapsFromPairsToPairs() {
       List<Tuple2<Integer, String>> pairs = Arrays.asList(
@@ -509,6 +515,7 @@ public class JavaAPISuite implements Serializable {
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void persist() {
     JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
@@ -573,6 +580,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(expected, readRDD.collect());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void sequenceFile() {
     File tempDir = Files.createTempDir();
@@ -602,6 +610,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(pairs, readRDD.collect());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void writeWithNewAPIHadoopFile() {
     File tempDir = Files.createTempDir();
@@ -632,6 +641,7 @@ public class JavaAPISuite implements Serializable {
     }).collect().toString());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void readWithNewAPIHadoopFile() throws IOException {
     File tempDir = Files.createTempDir();
@@ -674,6 +684,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(expected, readRDD.collect());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void objectFilesOfComplexTypes() {
     File tempDir = Files.createTempDir();
@@ -690,6 +701,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(pairs, readRDD.collect());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void hadoopFile() {
     File tempDir = Files.createTempDir();
@@ -719,6 +731,7 @@ public class JavaAPISuite implements Serializable {
     }).collect().toString());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void hadoopFileCompressed() {
     File tempDir = Files.createTempDir();
@@ -824,7 +837,7 @@ public class JavaAPISuite implements Serializable {
       }
     };
 
-    final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+    final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
     rdd.foreach(new VoidFunction<Integer>() {
       public void call(Integer x) {
         floatAccum.add((float) x);
@@ -876,6 +889,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void mapOnPairRDD() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
@@ -900,6 +914,7 @@ public class JavaAPISuite implements Serializable {
 
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void collectPartitions() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
@@ -968,7 +983,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void collectAsMapWithIntArrayValues() {
     // Regression test for SPARK-1040
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
     JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
       @Override
       public Tuple2<Integer, int[]> call(Integer x) throws Exception {
@@ -976,6 +991,6 @@ public class JavaAPISuite implements Serializable {
       }
     });
     pairRDD.collect();  // Works fine
-    Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
+    pairRDD.collectAsMap();  // Used to crash with ClassCastException
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6adc670..21060ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -592,12 +592,13 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
-          <version>2.5.1</version>
+          <version>3.1</version>
           <configuration>
             <source>${java.version}</source>
             <target>${java.version}</target>
             <encoding>UTF-8</encoding>
             <maxmem>1024m</maxmem>
+            <fork>true</fork>
           </configuration>
         </plugin>
         <plugin>
@@ -612,7 +613,7 @@
         <plugin>
           <groupId>org.scalatest</groupId>
           <artifactId>scalatest-maven-plugin</artifactId>
-          <version>1.0-M2</version>
+          <version>1.0-RC2</version>
           <configuration>
             <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
             <junitxml>.</junitxml>

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/repl/pom.xml
----------------------------------------------------------------------
diff --git a/repl/pom.xml b/repl/pom.xml
index 73597f6..4c5f972 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -98,7 +98,7 @@
             </goals>
             <configuration>
               <exportAntProperties>true</exportAntProperties>
-              <tasks>
+              <target>
                 <property name="spark.classpath" refid="maven.test.classpath" />
                 <property environment="env" />
                 <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
@@ -111,7 +111,7 @@
                     </not>
                   </condition>
                 </fail>
-              </tasks>
+              </target>
             </configuration>
           </execution>
         </executions>

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 4dcd0e4..2c7ff87 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -127,7 +127,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   /**
    * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
    * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
-   * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]]
+   * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner
    * is used to control the partitioning of each RDD.
    */
   def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
@@ -151,7 +151,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+   * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
    * thepartitioning of each RDD.
    */
   def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
@@ -161,7 +161,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   /**
    * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
    * combineByKey for RDDs. Please refer to combineByKey in
-   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+   * org.apache.spark.rdd.PairRDDFunctions for more information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
       mergeValue: JFunction2[C, V, C],
@@ -176,7 +176,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   /**
    * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
    * combineByKey for RDDs. Please refer to combineByKey in
-   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+   * org.apache.spark.rdd.PairRDDFunctions for more information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
       mergeValue: JFunction2[C, V, C],
@@ -479,7 +479,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   /**
    * Return a new "state" DStream where the state for each key is updated by applying
    * the given function on the previous state of the key and the new values of the key.
-   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    * @param updateFunc State update function. If `this` function returns None, then
    *                   corresponding state key-value pair will be eliminated.
    * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
@@ -579,7 +579,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
-   * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    */
   def join[W](
       other: JavaPairDStream[K, W],
@@ -619,7 +619,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
-   * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    */
   def leftOuterJoin[W](
       other: JavaPairDStream[K, W],
@@ -660,7 +660,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
-   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+   * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
    * the partitioning of each RDD.
    */
   def rightOuterJoin[W](

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 2268160..b082bb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -406,7 +406,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
    * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
    * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
-   * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+   * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
    */
   def transform[T](
       dstreams: JList[JavaDStream[_]],
@@ -429,7 +429,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
    * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
    * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
-   * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+   * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
    */
   def transform[K, V](
       dstreams: JList[JavaDStream[_]],

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f3c58ae..2473496 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -65,7 +65,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 
   /**
    * Return a new DStream by applying `groupByKey` on each RDD. The supplied
-   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    */
   def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
     val createCombiner = (v: V) => ArrayBuffer[V](v)
@@ -95,7 +95,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+   * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
    * the partitioning of each RDD.
    */
   def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
@@ -376,7 +376,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
   /**
    * Return a new "state" DStream where the state for each key is updated by applying
    * the given function on the previous state of the key and the new values of the key.
-   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    * @param updateFunc State update function. If `this` function returns None, then
    *                   corresponding state key-value pair will be eliminated.
    * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
@@ -396,7 +396,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
   /**
    * Return a new "state" DStream where the state for each key is updated by applying
    * the given function on the previous state of the key and the new values of each key.
-   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    * @param updateFunc State update function. If `this` function returns None, then
    *                   corresponding state key-value pair will be eliminated. Note, that
    *                   this function may generate a different a tuple with a different key
@@ -453,7 +453,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 
   /**
    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
-   * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
+   * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs.
    */
   def cogroup[W: ClassTag](
       other: DStream[(K, W)],
@@ -483,7 +483,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 
   /**
    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
-   * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    */
   def join[W: ClassTag](
       other: DStream[(K, W)],
@@ -518,7 +518,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 
   /**
    * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
-   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+   * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
    * the partitioning of each RDD.
    */
   def leftOuterJoin[W: ClassTag](
@@ -554,7 +554,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 
   /**
    * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
-   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+   * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
    * the partitioning of each RDD.
    */
   def rightOuterJoin[W: ClassTag](

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4fbbce9..54a0791 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -19,7 +19,6 @@ package org.apache.spark.streaming;
 
 import scala.Tuple2;
 
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import java.io.*;
@@ -30,7 +29,6 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.google.common.collect.Sets;
 
-import org.apache.spark.SparkConf;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -38,6 +36,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
@@ -45,6 +44,8 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 // serialized, as an alternative to converting these anonymous classes to static inner classes;
 // see http://stackoverflow.com/questions/758570/.
 public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
+
+  @SuppressWarnings("unchecked")
   @Test
   public void testCount() {
     List<List<Integer>> inputData = Arrays.asList(
@@ -64,6 +65,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     assertOrderInvariantEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testMap() {
     List<List<String>> inputData = Arrays.asList(
@@ -87,6 +89,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     assertOrderInvariantEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testWindow() {
     List<List<Integer>> inputData = Arrays.asList(
@@ -108,6 +111,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     assertOrderInvariantEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testWindowWithSlideDuration() {
     List<List<Integer>> inputData = Arrays.asList(
@@ -132,6 +136,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     assertOrderInvariantEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testFilter() {
     List<List<String>> inputData = Arrays.asList(
@@ -155,13 +160,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     assertOrderInvariantEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testRepartitionMorePartitions() {
     List<List<Integer>> inputData = Arrays.asList(
       Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
       Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
-    JavaDStream repartitioned = stream.repartition(4);
+    JavaDStream<Integer> stream =
+        JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
+    JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
+        stream.repartition(4);
     JavaTestUtils.attachTestOutputStream(repartitioned);
     List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
     Assert.assertEquals(2, result.size());
@@ -172,13 +180,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testRepartitionFewerPartitions() {
     List<List<Integer>> inputData = Arrays.asList(
       Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
       Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
-    JavaDStream repartitioned = stream.repartition(2);
+    JavaDStream<Integer> stream =
+        JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
+    JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
+        stream.repartition(2);
     JavaTestUtils.attachTestOutputStream(repartitioned);
     List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
     Assert.assertEquals(2, result.size());
@@ -188,6 +199,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testGlom() {
     List<List<String>> inputData = Arrays.asList(
@@ -206,6 +218,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testMapPartitions() {
     List<List<String>> inputData = Arrays.asList(
@@ -217,16 +230,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         Arrays.asList("YANKEESRED SOCKS"));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
-      @Override
-      public Iterable<String> call(Iterator<String> in) {
-        String out = "";
-        while (in.hasNext()) {
-          out = out + in.next().toUpperCase();
-        }
-        return Lists.newArrayList(out);
-      }
-    });
+    JavaDStream<String> mapped = stream.mapPartitions(
+        new FlatMapFunction<Iterator<String>, String>() {
+          @Override
+          public Iterable<String> call(Iterator<String> in) {
+            String out = "";
+            while (in.hasNext()) {
+              out = out + in.next().toUpperCase();
+            }
+            return Lists.newArrayList(out);
+          }
+        });
     JavaTestUtils.attachTestOutputStream(mapped);
     List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -247,6 +261,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testReduce() {
     List<List<Integer>> inputData = Arrays.asList(
@@ -267,6 +282,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testReduceByWindow() {
     List<List<Integer>> inputData = Arrays.asList(
@@ -289,6 +305,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testQueueStream() {
     List<List<Integer>> expected = Arrays.asList(
@@ -312,6 +329,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testTransform() {
     List<List<Integer>> inputData = Arrays.asList(
@@ -344,6 +362,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     assertOrderInvariantEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testVariousTransform() {
     // tests whether all variations of transform can be called from Java
@@ -423,6 +442,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testTransformWith() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -492,6 +512,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   }
 
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testVariousTransformWith() {
     // tests whether all variations of transformWith can be called from Java
@@ -591,6 +612,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     );
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testStreamingContextTransform(){
     List<List<Integer>> stream1input = Arrays.asList(
@@ -658,6 +680,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testFlatMap() {
     List<List<String>> inputData = Arrays.asList(
@@ -683,6 +706,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     assertOrderInvariantEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairFlatMap() {
     List<List<String>> inputData = Arrays.asList(
@@ -718,22 +742,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
             new Tuple2<Integer, String>(9, "s")));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
-      @Override
-      public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
-        List<Tuple2<Integer, String>> out = Lists.newArrayList();
-        for (String letter: in.split("(?!^)")) {
-          out.add(new Tuple2<Integer, String>(in.length(), letter));
-        }
-        return out;
-      }
-    });
+    JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
+        new PairFlatMapFunction<String, Integer, String>() {
+          @Override
+          public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+            List<Tuple2<Integer, String>> out = Lists.newArrayList();
+            for (String letter: in.split("(?!^)")) {
+              out.add(new Tuple2<Integer, String>(in.length(), letter));
+            }
+            return out;
+          }
+        });
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testUnion() {
     List<List<Integer>> inputData1 = Arrays.asList(
@@ -778,6 +804,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
 
   // PairDStream Functions
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairFilter() {
     List<List<String>> inputData = Arrays.asList(
@@ -810,7 +837,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
-  List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+  @SuppressWarnings("unchecked")
+  private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
       Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
           new Tuple2<String, String>("california", "giants"),
           new Tuple2<String, String>("new york", "yankees"),
@@ -820,7 +848,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
           new Tuple2<String, String>("new york", "rangers"),
           new Tuple2<String, String>("new york", "islanders")));
 
-  List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+  @SuppressWarnings("unchecked")
+  private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
       Arrays.asList(
           new Tuple2<String, Integer>("california", 1),
           new Tuple2<String, Integer>("california", 3),
@@ -832,6 +861,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
           new Tuple2<String, Integer>("new york", 3),
           new Tuple2<String, Integer>("new york", 1)));
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairMap() { // Maps pair -> pair of different type
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -864,6 +894,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairMapPartitions() { // Maps pair -> pair of different type
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -901,6 +932,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairMap2() { // Maps pair -> single
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -925,6 +957,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
     List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
@@ -967,6 +1000,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairGroupByKey() {
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -989,6 +1023,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairReduceByKey() {
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1013,6 +1048,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testCombineByKey() {
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1043,6 +1079,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testCountByValue() {
     List<List<String>> inputData = Arrays.asList(
@@ -1068,6 +1105,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testGroupByKeyAndWindow() {
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1113,6 +1151,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testReduceByKeyAndWindow() {
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1136,6 +1175,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testUpdateStateByKey() {
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1171,6 +1211,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testReduceByKeyAndWindowWithInverse() {
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1194,6 +1235,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testCountByValueAndWindow() {
     List<List<String>> inputData = Arrays.asList(
@@ -1227,6 +1269,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, unorderedResult);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairTransform() {
     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
@@ -1271,6 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testPairToNormalRDDTransform() {
     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
@@ -1312,6 +1356,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
   public void testMapValues() {
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
 
@@ -1342,6 +1388,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testFlatMapValues() {
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -1386,6 +1433,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testCoGroup() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1429,6 +1477,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testJoin() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1472,6 +1521,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testLeftOuterJoin() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1503,6 +1553,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testCheckpointMasterRecovery() throws InterruptedException {
     List<List<String>> inputData = Arrays.asList(
@@ -1541,7 +1592,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   }
 
 
-  /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+  /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+  @SuppressWarnings("unchecked")
   @Test
   public void testCheckpointofIndividualStream() throws InterruptedException {
     List<List<String>> inputData = Arrays.asList(
@@ -1581,16 +1633,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   @Test
   public void testSocketString() {
     class Converter extends Function<InputStream, Iterable<String>> {
-      public Iterable<String> call(InputStream in) {
+      public Iterable<String> call(InputStream in) throws IOException {
         BufferedReader reader = new BufferedReader(new InputStreamReader(in));
         List<String> out = new ArrayList<String>();
-        try {
-          while (true) {
-            String line = reader.readLine();
-            if (line == null) { break; }
-            out.add(line);
-          }
-        } catch (IOException e) { }
+        while (true) {
+          String line = reader.readLine();
+          if (line == null) { break; }
+          out.add(line);
+        }
         return out;
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/12bbca20/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
index e7eba36..c0e133d 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -133,7 +133,7 @@
             </goals>
             <configuration>
               <exportAntProperties>true</exportAntProperties>
-              <tasks>
+              <target>
                 <property name="spark.classpath" refid="maven.test.classpath" />
                 <property environment="env" />
                 <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">  
@@ -146,7 +146,7 @@
                     </not>
                   </condition>
                 </fail>
-              </tasks>
+              </target>
             </configuration>
           </execution>
         </executions>