You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/04 07:31:36 UTC

[1/3] [java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs

Repository: spark
Updated Branches:
  refs/heads/master b14ede789 -> 181ec5030


http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2c7ff87..ac451d1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,24 +17,25 @@
 
 package org.apache.spark.streaming.api.java
 
-import java.util.{List => JList}
 import java.lang.{Long => JLong}
+import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3}
-import org.apache.spark.Partitioner
+import com.google.common.base.Optional
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
-import org.apache.spark.storage.StorageLevel
-import com.google.common.base.Optional
+import org.apache.spark.Partitioner
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 
 /**
@@ -54,7 +55,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /** Return a new DStream containing only the elements that satisfy a predicate. */
   def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
-    dstream.filter((x => f(x).booleanValue()))
+    dstream.filter((x => f.call(x).booleanValue()))
 
   /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
   def cache(): JavaPairDStream[K, V] = dstream.cache()
@@ -168,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       mergeCombiners: JFunction2[C, C, C],
       partitioner: Partitioner
     ): JavaPairDStream[K, C] = {
-    implicit val cm: ClassTag[C] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+    implicit val cm: ClassTag[C] = fakeClassTag
     dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
   }
 
@@ -184,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       partitioner: Partitioner,
       mapSideCombine: Boolean
     ): JavaPairDStream[K, C] = {
-    implicit val cm: ClassTag[C] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+    implicit val cm: ClassTag[C] = fakeClassTag
     dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
   }
 
@@ -279,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       DStream's batching interval
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration
     ):JavaPairDStream[K, V] = {
@@ -299,7 +298,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param numPartitions  Number of partitions of each RDD in the new DStream.
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
       numPartitions: Int
@@ -320,7 +319,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                    DStream.
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner
@@ -345,8 +344,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       DStream's batching interval
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
-      invReduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
+      invReduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration
     ): JavaPairDStream[K, V] = {
@@ -374,8 +373,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       set this to null if you do not want to filter
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
-      invReduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
+      invReduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
       numPartitions: Int,
@@ -412,8 +411,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       set this to null if you do not want to filter
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
-      invReduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
+      invReduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner,
@@ -453,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    */
   def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
   : JavaPairDStream[K, S] = {
-    implicit val cm: ClassTag[S] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+    implicit val cm: ClassTag[S] = fakeClassTag
     dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
   }
 
@@ -471,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
       numPartitions: Int)
   : JavaPairDStream[K, S] = {
-    implicit val cm: ClassTag[S] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+    implicit val cm: ClassTag[S] = fakeClassTag
     dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
   }
 
@@ -490,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
       partitioner: Partitioner
   ): JavaPairDStream[K, S] = {
-    implicit val cm: ClassTag[S] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+    implicit val cm: ClassTag[S] = fakeClassTag
     dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
   }
 
@@ -501,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * 'this' DStream without changing the key.
    */
   def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
-    implicit val cm: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+    implicit val cm: ClassTag[U] = fakeClassTag
     dstream.mapValues(f)
   }
 
@@ -524,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * of partitions.
    */
   def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
   }
 
@@ -537,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       numPartitions: Int
     ): JavaPairDStream[K, (JList[V], JList[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.cogroup(other.dstream, numPartitions)
            .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
   }
@@ -551,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
     ): JavaPairDStream[K, (JList[V], JList[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.cogroup(other.dstream, partitioner)
            .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
   }
@@ -562,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    */
   def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.join(other.dstream)
   }
 
@@ -572,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
    */
   def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.join(other.dstream, numPartitions)
   }
 
@@ -585,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
     ): JavaPairDStream[K, (V, W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.join(other.dstream, partitioner)
   }
 
@@ -596,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * number of partitions.
    */
   def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.leftOuterJoin(other.dstream)
     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
   }
@@ -611,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       numPartitions: Int
     ): JavaPairDStream[K, (V, Optional[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
   }
@@ -625,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
     ): JavaPairDStream[K, (V, Optional[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
   }
@@ -652,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       numPartitions: Int
     ): JavaPairDStream[K, (Optional[V], W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
     joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
   }
@@ -667,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
     ): JavaPairDStream[K, (Optional[V], W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
     joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
   }
@@ -748,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
     new JavaDStream[(K, V)](dstream)
   }
 
-  override val classTag: ClassTag[(K, V)] =
-    implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+  override val classTag: ClassTag[(K, V)] = fakeClassTag
 }
 
 object JavaPairDStream {
@@ -758,10 +741,8 @@ object JavaPairDStream {
   }
 
   def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
-    implicit val cmk: ClassTag[K] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val cmv: ClassTag[V] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+    implicit val cmk: ClassTag[K] = fakeClassTag
+    implicit val cmv: ClassTag[V] = fakeClassTag
     new JavaPairDStream[K, V](dstream.dstream)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b082bb0..c48d754 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -187,7 +187,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
       converter: JFunction[InputStream, java.lang.Iterable[T]],
       storageLevel: StorageLevel)
   : JavaDStream[T] = {
-    def fn = (x: InputStream) => converter.apply(x).toIterator
+    def fn = (x: InputStream) => converter.call(x).toIterator
     implicit val cmt: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     ssc.socketStream(hostname, port, fn, storageLevel)
@@ -431,7 +431,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
    * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
    */
-  def transform[K, V](
+  def transformToPair[K, V](
       dstreams: JList[JavaDStream[_]],
       transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
     ): JavaPairDStream[K, V] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 54a0791..e93bf18 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -247,14 +247,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
-  private class IntegerSum extends Function2<Integer, Integer, Integer> {
+  private class IntegerSum implements Function2<Integer, Integer, Integer> {
     @Override
     public Integer call(Integer i1, Integer i2) throws Exception {
       return i1 + i2;
     }
   }
 
-  private class IntegerDifference extends Function2<Integer, Integer, Integer> {
+  private class IntegerDifference implements Function2<Integer, Integer, Integer> {
     @Override
     public Integer call(Integer i1, Integer i2) throws Exception {
       return i1 - i2;
@@ -392,7 +392,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
       }
     );
 
-    JavaPairDStream<String, Integer> transformed3 = stream.transform(
+    JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(
         new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
           @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
             return null;
@@ -400,7 +400,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         }
     );
 
-    JavaPairDStream<String, Integer> transformed4 = stream.transform(
+    JavaPairDStream<String, Integer> transformed4 = stream.transformToPair(
         new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
           @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
             return null;
@@ -424,7 +424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         }
     );
 
-    JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+    JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(
         new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
           @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
             return null;
@@ -432,7 +432,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         }
     );
 
-    JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+    JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair(
         new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
           @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
             return null;
@@ -482,7 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         ssc, stringStringKVStream2, 1);
     JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
 
-    JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
+    JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair(
         pairStream2,
         new Function3<
             JavaPairRDD<String, String>,
@@ -551,7 +551,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         }
     );
 
-    JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+    JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair(
         stream2,
         new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
           @Override
@@ -561,7 +561,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         }
     );
 
-    JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+    JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair(
         pairStream1,
         new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
           @Override
@@ -591,7 +591,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         }
     );
 
-    JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith(
+    JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair(
         stream2,
         new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
           @Override
@@ -601,7 +601,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         }
     );
 
-    JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
+    JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair(
         pairStream2,
         new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
           @Override
@@ -656,7 +656,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     List<JavaDStream<?>> listOfDStreams2 =
         Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
 
-    JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform(
+    JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
       listOfDStreams2,
       new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
         public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
@@ -671,7 +671,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
               return new Tuple2<Integer, Integer>(i, i);
             }
           };
-          return rdd1.union(rdd2).map(mapToTuple).join(prdd3);
+          return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
         }
       }
     );
@@ -742,17 +742,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
             new Tuple2<Integer, String>(9, "s")));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
-        new PairFlatMapFunction<String, Integer, String>() {
-          @Override
-          public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
-            List<Tuple2<Integer, String>> out = Lists.newArrayList();
-            for (String letter: in.split("(?!^)")) {
-              out.add(new Tuple2<Integer, String>(in.length(), letter));
-            }
-            return out;
+    JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
+      new PairFlatMapFunction<String, Integer, String>() {
+        @Override
+        public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+          List<Tuple2<Integer, String>> out = Lists.newArrayList();
+          for (String letter: in.split("(?!^)")) {
+            out.add(new Tuple2<Integer, String>(in.length(), letter));
           }
-        });
+          return out;
+        }
+      });
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
@@ -816,7 +816,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = stream.map(
+    JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
         new PairFunction<String, String, Integer>() {
           @Override
           public Tuple2<String, Integer> call(String in) throws Exception {
@@ -880,7 +880,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.map(
+    JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
         new PairFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
           public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
@@ -913,7 +913,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions(
+    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
         new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
           @Override
           public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
@@ -983,7 +983,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<Tuple2<String, Integer>> stream =
         JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
+    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
         new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
           public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
@@ -1228,7 +1228,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Integer> reduceWindowed =
-        pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
+        pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
+          new Duration(2000), new Duration(1000));
     JavaTestUtils.attachTestOutputStream(reduceWindowed);
     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
@@ -1300,7 +1301,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         ssc, inputData, 1);
     JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+    JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
         new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
           @Override
           public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
@@ -1632,7 +1633,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
   @Test
   public void testSocketString() {
-    class Converter extends Function<InputStream, Iterable<String>> {
+  
+    class Converter implements Function<InputStream, Iterable<String>> {
       public Iterable<String> call(InputStream in) throws IOException {
         BufferedReader reader = new BufferedReader(new InputStreamReader(in));
         List<String> out = new ArrayList<String>();


[2/3] [java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs

Posted by pw...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index 435a86e..64a3a04 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -35,7 +35,7 @@ import scala.Tuple2;
  */
 public final class JavaALS {
 
-  static class ParseRating extends Function<String, Rating> {
+  static class ParseRating implements Function<String, Rating> {
     private static final Pattern COMMA = Pattern.compile(",");
 
     @Override
@@ -48,7 +48,7 @@ public final class JavaALS {
     }
   }
 
-  static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> {
+  static class FeaturesToString implements Function<Tuple2<Object, double[]>, String> {
     @Override
     public String call(Tuple2<Object, double[]> element) {
       return element._1() + "," + Arrays.toString(element._2());

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
index 4b2658f..76ebdcc 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
@@ -32,7 +32,7 @@ import java.util.regex.Pattern;
  */
 public final class JavaKMeans {
 
-  static class ParsePoint extends Function<String, double[]> {
+  static class ParsePoint implements Function<String, double[]> {
     private static final Pattern SPACE = Pattern.compile(" ");
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 21586ce..667c72f 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -34,7 +34,7 @@ import java.util.regex.Pattern;
  */
 public final class JavaLR {
 
-  static class ParsePoint extends Function<String, LabeledPoint> {
+  static class ParsePoint implements Function<String, LabeledPoint> {
     private static final Pattern COMMA = Pattern.compile(",");
     private static final Pattern SPACE = Pattern.compile(" ");
 

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 2ffd351..d704be0 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -89,7 +89,7 @@ public final class JavaKafkaWordCount {
       }
     });
 
-    JavaPairDStream<String, Integer> wordCounts = words.map(
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String, Integer> call(String s) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 7777c98..7f68d45 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -69,7 +69,7 @@ public final class JavaNetworkWordCount {
         return Lists.newArrayList(SPACE.split(x));
       }
     });
-    JavaPairDStream<String, Integer> wordCounts = words.map(
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String, Integer> call(String s) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index 26c4462..88ad341 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -63,7 +63,7 @@ public final class JavaQueueStream {
 
     // Create the QueueInputDStream and use it do some processing
     JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
-    JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
+    JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
         new PairFunction<Integer, Integer, Integer>() {
           @Override
           public Tuple2<Integer, Integer> call(Integer i) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index c989ec0..b254e00 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -75,7 +75,7 @@ object ZeroMQUtils {
     ): JavaDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
     createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
   }
 
@@ -99,7 +99,7 @@ object ZeroMQUtils {
     ): JavaDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
     createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
   }
 
@@ -122,7 +122,7 @@ object ZeroMQUtils {
     ): JavaDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
     createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/README.md
----------------------------------------------------------------------
diff --git a/extras/README.md b/extras/README.md
new file mode 100644
index 0000000..1b4174b
--- /dev/null
+++ b/extras/README.md
@@ -0,0 +1 @@
+This directory contains build components not included by default in Spark's build.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/README.md
----------------------------------------------------------------------
diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md
new file mode 100644
index 0000000..e95b73a
--- /dev/null
+++ b/extras/java8-tests/README.md
@@ -0,0 +1,24 @@
+# Java 8 Test Suites
+
+These tests require having Java 8 installed and are isolated from the main Spark build.
+If Java 8 is not your system's default Java version, you will need to point Spark's build
+to your Java location. The set-up depends a bit on the build system:
+
+* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
+  `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
+  include the Java 8 test project.
+
+  `$ JAVA_HOME=/opt/jdk1.8.0/ sbt/sbt clean "test-only org.apache.spark.Java8APISuite"`
+
+* For Maven users,
+
+  Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not
+  automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests`
+  must be used.
+
+  `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
+  `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite`
+
+  Note that the above command can only be run from project root directory since this module 
+  depends on core and the test-jars of core and streaming. This means an install step is 
+  required to make the test dependencies visible to the Java 8 sub-project.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
new file mode 100644
index 0000000..602f66f
--- /dev/null
+++ b/extras/java8-tests/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>java8-tests_2.10</artifactId>
+  <packaging>pom</packaging>
+  <name>Spark Project Java8 Tests POM</name>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>com.novocode</groupId>
+      <artifactId>junit-interface</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>java8-tests</id>
+    </profile>
+  </profiles>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <systemPropertyVariables>
+            <!-- For some reason surefire isn't setting this log4j file on the
+                 test classpath automatically. So we add it manually. -->
+            <log4j.configuration>
+              file:src/test/resources/log4j.properties
+            </log4j.configuration>
+          </systemPropertyVariables>
+          <skipTests>false</skipTests>
+          <includes>
+            <include>**/Suite*.java</include>
+            <include>**/*Suite.java</include>
+          </includes>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test-compile-first</id>
+            <phase>process-test-resources</phase>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <fork>true</fork>
+          <verbose>true</verbose>
+          <forceJavacCompilerUse>true</forceJavacCompilerUse>
+          <source>1.8</source>
+          <compilerVersion>1.8</compilerVersion>
+          <target>1.8</target>
+          <encoding>UTF-8</encoding>
+          <maxmem>1024m</maxmem>
+        </configuration>
+      </plugin>
+      <plugin>
+        <!-- disabled -->
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>none</phase>
+          </execution>
+          <execution>
+            <id>scala-compile-first</id>
+            <phase>none</phase>
+          </execution>
+          <execution>
+            <id>scala-test-compile-first</id>
+            <phase>none</phase>
+          </execution>
+          <execution>
+            <id>attach-scaladocs</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
new file mode 100644
index 0000000..f672512
--- /dev/null
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
+
+/**
+ * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite implements Serializable {
+  static int foreachCalls = 0;
+  private transient JavaSparkContext sc;
+
+  @Before
+  public void setUp() {
+    sc = new JavaSparkContext("local", "JavaAPISuite");
+  }
+
+  @After
+  public void tearDown() {
+    sc.stop();
+    sc = null;
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.driver.port");
+  }
+
+  @Test
+  public void foreachWithAnonymousClass() {
+    foreachCalls = 0;
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+    rdd.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String s) {
+        foreachCalls++;
+      }
+    });
+    Assert.assertEquals(2, foreachCalls);
+  }
+
+  @Test
+  public void foreach() {
+    foreachCalls = 0;
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+    rdd.foreach((x) -> foreachCalls++);
+    Assert.assertEquals(2, foreachCalls);
+  }
+
+  @Test
+  public void groupBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
+    JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+    Assert.assertEquals(2, oddsAndEvens.count());
+    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
+    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+
+    oddsAndEvens = rdd.groupBy(isOdd, 1);
+    Assert.assertEquals(2, oddsAndEvens.count());
+    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
+    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+  }
+
+  @Test
+  public void leftOuterJoin() {
+    JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(1, 2),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(3, 1)
+    ));
+    JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+      new Tuple2<Integer, Character>(1, 'x'),
+      new Tuple2<Integer, Character>(2, 'y'),
+      new Tuple2<Integer, Character>(2, 'z'),
+      new Tuple2<Integer, Character>(4, 'w')
+    ));
+    List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
+      rdd1.leftOuterJoin(rdd2).collect();
+    Assert.assertEquals(5, joined.size());
+    Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
+      rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
+    Assert.assertEquals(3, firstUnmatched._1().intValue());
+  }
+
+  @Test
+  public void foldReduce() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+    Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
+
+    int sum = rdd.fold(0, add);
+    Assert.assertEquals(33, sum);
+
+    sum = rdd.reduce(add);
+    Assert.assertEquals(33, sum);
+  }
+
+  @Test
+  public void foldByKey() {
+    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(3, 2),
+      new Tuple2<Integer, Integer>(3, 1)
+    );
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
+    Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
+    Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
+    Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
+  }
+
+  @Test
+  public void reduceByKey() {
+    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(2, 1),
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(3, 2),
+      new Tuple2<Integer, Integer>(3, 1)
+    );
+    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
+    Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
+    Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
+    Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
+
+    Map<Integer, Integer> localCounts = counts.collectAsMap();
+    Assert.assertEquals(1, localCounts.get(1).intValue());
+    Assert.assertEquals(2, localCounts.get(2).intValue());
+    Assert.assertEquals(3, localCounts.get(3).intValue());
+
+    localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
+    Assert.assertEquals(1, localCounts.get(1).intValue());
+    Assert.assertEquals(2, localCounts.get(2).intValue());
+    Assert.assertEquals(3, localCounts.get(3).intValue());
+  }
+
+  @Test
+  public void map() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
+    doubles.collect();
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
+      .cache();
+    pairs.collect();
+    JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
+    strings.collect();
+  }
+
+  @Test
+  public void flatMap() {
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+      "The quick brown fox jumps over the lazy dog."));
+    JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
+
+    Assert.assertEquals("Hello", words.first());
+    Assert.assertEquals(11, words.count());
+
+    JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
+      List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
+      for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
+      return pairs2;
+    });
+
+    Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+    Assert.assertEquals(11, pairs.count());
+
+    JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
+      List<Double> lengths = new LinkedList<Double>();
+      for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+      return lengths;
+    });
+
+    Double x = doubles.first();
+    Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+    Assert.assertEquals(11, pairs.count());
+  }
+
+  @Test
+  public void mapsFromPairsToPairs() {
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+    // Regression test for SPARK-668:
+    JavaPairRDD<String, Integer> swapped =
+      pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
+    swapped.collect();
+
+    // There was never a bug here, but it's worth testing:
+    pairRDD.map(item -> item.swap()).collect();
+  }
+
+  @Test
+  public void mapPartitions() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+    JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
+      int sum = 0;
+      while (iter.hasNext()) {
+        sum += iter.next();
+      }
+      return Collections.singletonList(sum);
+    });
+
+    Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+  }
+
+  @Test
+  public void sequenceFile() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+      new Tuple2<Integer, String>(1, "a"),
+      new Tuple2<Integer, String>(2, "aa"),
+      new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.mapToPair(pair ->
+      new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
+      .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+    // Try reading the output back as an object file
+    JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
+      .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
+    Assert.assertEquals(pairs, readRDD.collect());
+  }
+
+  @Test
+  public void zip() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
+    JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+    zipped.count();
+  }
+
+  @Test
+  public void zipPartitions() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+    FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+      (Iterator<Integer> i, Iterator<String> s) -> {
+        int sizeI = 0;
+        int sizeS = 0;
+        while (i.hasNext()) {
+          sizeI += 1;
+          i.next();
+        }
+        while (s.hasNext()) {
+          sizeS += 1;
+          s.next();
+        }
+        return Arrays.asList(sizeI, sizeS);
+      };
+    JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+    Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+  }
+
+  @Test
+  public void accumulators() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+    final Accumulator<Integer> intAccum = sc.intAccumulator(10);
+    rdd.foreach(x -> intAccum.add(x));
+    Assert.assertEquals((Integer) 25, intAccum.value());
+
+    final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+    rdd.foreach(x -> doubleAccum.add((double) x));
+    Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+    // Try a custom accumulator type
+    AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+      public Float addInPlace(Float r, Float t) {
+        return r + t;
+      }
+
+      public Float addAccumulator(Float r, Float t) {
+        return r + t;
+      }
+
+      public Float zero(Float initialValue) {
+        return 0.0f;
+      }
+    };
+
+    final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+    rdd.foreach(x -> floatAccum.add((float) x));
+    Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+    // Test the setValue method
+    floatAccum.setValue(5.0f);
+    Assert.assertEquals((Float) 5.0f, floatAccum.value());
+  }
+
+  @Test
+  public void keyBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+    List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
+    Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+    Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+  }
+
+  @Test
+  public void mapOnPairRDD() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    JavaPairRDD<Integer, Integer> rdd2 =
+      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+    JavaPairRDD<Integer, Integer> rdd3 =
+      rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
+    Assert.assertEquals(Arrays.asList(
+      new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(0, 2),
+      new Tuple2<Integer, Integer>(1, 3),
+      new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+  }
+
+  @Test
+  public void collectPartitions() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+    JavaPairRDD<Integer, Integer> rdd2 =
+      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+    List[] parts = rdd1.collectPartitions(new int[]{0});
+    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+    parts = rdd1.collectPartitions(new int[]{1, 2});
+    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+      new Tuple2<Integer, Integer>(2, 0)),
+      rdd2.collectPartitions(new int[]{0})[0]);
+
+    parts = rdd2.collectPartitions(new int[]{1, 2});
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+      new Tuple2<Integer, Integer>(4, 0)), parts[0]);
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+      new Tuple2<Integer, Integer>(6, 0),
+      new Tuple2<Integer, Integer>(7, 1)), parts[1]);
+  }
+
+  @Test
+  public void collectAsMapWithIntArrayValues() {
+    // Regression test for SPARK-1040
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
+    JavaPairRDD<Integer, int[]> pairRDD =
+      rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
+    pairRDD.collect();  // Works fine
+    Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
new file mode 100644
index 0000000..43df0de
--- /dev/null
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+/**
+ * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
+
+  @Test
+  public void testMap() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("hello", "world"),
+      Arrays.asList("goodnight", "moon"));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(5, 5),
+      Arrays.asList(9, 4));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> letterCount = stream.map(s -> s.length());
+    JavaTestUtils.attachTestOutputStream(letterCount);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testFilter() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("giants", "dodgers"),
+      Arrays.asList("yankees", "red socks"));
+
+    List<List<String>> expected = Arrays.asList(
+      Arrays.asList("giants"),
+      Arrays.asList("yankees"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
+    JavaTestUtils.attachTestOutputStream(filtered);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testMapPartitions() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("giants", "dodgers"),
+      Arrays.asList("yankees", "red socks"));
+
+    List<List<String>> expected = Arrays.asList(
+      Arrays.asList("GIANTSDODGERS"),
+      Arrays.asList("YANKEESRED SOCKS"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> mapped = stream.mapPartitions(in -> {
+      String out = "";
+      while (in.hasNext()) {
+        out = out + in.next().toUpperCase();
+      }
+      return Lists.newArrayList(out);
+    });
+    JavaTestUtils.attachTestOutputStream(mapped);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduce() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1, 2, 3),
+      Arrays.asList(4, 5, 6),
+      Arrays.asList(7, 8, 9));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(6),
+      Arrays.asList(15),
+      Arrays.asList(24));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
+    JavaTestUtils.attachTestOutputStream(reduced);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduceByWindow() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1, 2, 3),
+      Arrays.asList(4, 5, 6),
+      Arrays.asList(7, 8, 9));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(6),
+      Arrays.asList(21),
+      Arrays.asList(39),
+      Arrays.asList(24));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
+      (x, y) -> x - y, new Duration(2000), new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(reducedWindowed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testTransform() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1, 2, 3),
+      Arrays.asList(4, 5, 6),
+      Arrays.asList(7, 8, 9));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(3, 4, 5),
+      Arrays.asList(6, 7, 8),
+      Arrays.asList(9, 10, 11));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
+
+    JavaTestUtils.attachTestOutputStream(transformed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testVariousTransform() {
+    // tests whether all variations of transform can be called from Java
+
+    List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+    List<List<Tuple2<String, Integer>>> pairInputData =
+      Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+      JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+    JavaDStream<Integer> transformed1 = stream.transform(in -> null);
+    JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
+    JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
+    JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
+    JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
+    JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
+    JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
+    JavaPairDStream<String, String> pairTransformed4 =
+      pairStream.transformToPair((x, time) -> null);
+
+  }
+
+  @Test
+  public void testTransformWith() {
+    List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, String>("california", "dodgers"),
+        new Tuple2<String, String>("new york", "yankees")),
+      Arrays.asList(
+        new Tuple2<String, String>("california", "sharks"),
+        new Tuple2<String, String>("new york", "rangers")));
+
+    List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, String>("california", "giants"),
+        new Tuple2<String, String>("new york", "mets")),
+      Arrays.asList(
+        new Tuple2<String, String>("california", "ducks"),
+        new Tuple2<String, String>("new york", "islanders")));
+
+
+    List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+      Sets.newHashSet(
+        new Tuple2<String, Tuple2<String, String>>("california",
+          new Tuple2<String, String>("dodgers", "giants")),
+        new Tuple2<String, Tuple2<String, String>>("new york",
+          new Tuple2<String, String>("yankees", "mets"))),
+      Sets.newHashSet(
+        new Tuple2<String, Tuple2<String, String>>("california",
+          new Tuple2<String, String>("sharks", "ducks")),
+        new Tuple2<String, Tuple2<String, String>>("new york",
+          new Tuple2<String, String>("rangers", "islanders"))));
+
+    JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+      ssc, stringStringKVStream1, 1);
+    JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+    JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+      ssc, stringStringKVStream2, 1);
+    JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+    JavaPairDStream<String, Tuple2<String, String>> joined =
+      pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
+
+    JavaTestUtils.attachTestOutputStream(joined);
+    List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+    List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+    for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
+      unorderedResult.add(Sets.newHashSet(res));
+    }
+
+    Assert.assertEquals(expected, unorderedResult);
+  }
+
+
+  @Test
+  public void testVariousTransformWith() {
+    // tests whether all variations of transformWith can be called from Java
+
+    List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+    List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+    JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+    List<List<Tuple2<String, Integer>>> pairInputData1 =
+      Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+    List<List<Tuple2<Double, Character>>> pairInputData2 =
+      Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+    JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+      JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+    JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+      JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+    JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
+    JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
+
+    JavaPairDStream<Double, Double> transformed3 =
+      stream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+    JavaPairDStream<Double, Double> transformed4 =
+      stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
+
+    JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
+
+    JavaDStream<Double> pairTransformed2_ =
+      pairStream1.transformWith(pairStream1,(x, y, z) -> null);
+
+    JavaPairDStream<Double, Double> pairTransformed3 =
+      pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+    JavaPairDStream<Double, Double> pairTransformed4 =
+      pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
+  }
+
+  @Test
+  public void testStreamingContextTransform() {
+    List<List<Integer>> stream1input = Arrays.asList(
+      Arrays.asList(1),
+      Arrays.asList(2)
+    );
+
+    List<List<Integer>> stream2input = Arrays.asList(
+      Arrays.asList(3),
+      Arrays.asList(4)
+    );
+
+    List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
+      Arrays.asList(new Tuple2<Integer, String>(1, "x")),
+      Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+    );
+
+    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
+      Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+    );
+
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
+    JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
+    JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
+      JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
+
+    List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+
+    // This is just to test whether this transform to JavaStream compiles
+    JavaDStream<Long> transformed1 = ssc.transform(
+      listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+      assert (listOfRDDs.size() == 2);
+      return null;
+    });
+
+    List<JavaDStream<?>> listOfDStreams2 =
+      Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+
+    JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
+      listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+      assert (listOfRDDs.size() == 3);
+      JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
+      JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
+      JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
+      JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+      PairFunction<Integer, Integer, Integer> mapToTuple =
+        (Integer i) -> new Tuple2<Integer, Integer>(i, i);
+      return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
+    });
+    JavaTestUtils.attachTestOutputStream(transformed2);
+    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
+      JavaTestUtils.runStreams(ssc, 2, 2);
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testFlatMap() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("go", "giants"),
+      Arrays.asList("boo", "dodgers"),
+      Arrays.asList("athletics"));
+
+    List<List<String>> expected = Arrays.asList(
+      Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
+      Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
+      Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testPairFlatMap() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("giants"),
+      Arrays.asList("dodgers"),
+      Arrays.asList("athletics"));
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, String>(6, "g"),
+        new Tuple2<Integer, String>(6, "i"),
+        new Tuple2<Integer, String>(6, "a"),
+        new Tuple2<Integer, String>(6, "n"),
+        new Tuple2<Integer, String>(6, "t"),
+        new Tuple2<Integer, String>(6, "s")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(7, "d"),
+        new Tuple2<Integer, String>(7, "o"),
+        new Tuple2<Integer, String>(7, "d"),
+        new Tuple2<Integer, String>(7, "g"),
+        new Tuple2<Integer, String>(7, "e"),
+        new Tuple2<Integer, String>(7, "r"),
+        new Tuple2<Integer, String>(7, "s")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(9, "a"),
+        new Tuple2<Integer, String>(9, "t"),
+        new Tuple2<Integer, String>(9, "h"),
+        new Tuple2<Integer, String>(9, "l"),
+        new Tuple2<Integer, String>(9, "e"),
+        new Tuple2<Integer, String>(9, "t"),
+        new Tuple2<Integer, String>(9, "i"),
+        new Tuple2<Integer, String>(9, "c"),
+        new Tuple2<Integer, String>(9, "s")));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
+      List<Tuple2<Integer, String>> out = Lists.newArrayList();
+      for (String letter : s.split("(?!^)")) {
+        out.add(new Tuple2<Integer, String>(s.length(), letter));
+      }
+      return out;
+    });
+
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  /*
+   * Performs an order-invariant comparison of lists representing two RDD streams. This allows
+   * us to account for ordering variation within individual RDD's which occurs during windowing.
+   */
+  public static <T extends Comparable<T>> void assertOrderInvariantEquals(
+    List<List<T>> expected, List<List<T>> actual) {
+    for (List<T> list : expected) {
+      Collections.sort(list);
+    }
+    for (List<T> list : actual) {
+      Collections.sort(list);
+    }
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testPairFilter() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("giants", "dodgers"),
+      Arrays.asList("yankees", "red socks"));
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
+      Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream =
+      stream.mapToPair(x -> new Tuple2<>(x, x.length()));
+    JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
+    JavaTestUtils.attachTestOutputStream(filtered);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+    Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+      new Tuple2<String, String>("california", "giants"),
+      new Tuple2<String, String>("new york", "yankees"),
+      new Tuple2<String, String>("new york", "mets")),
+    Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+      new Tuple2<String, String>("california", "ducks"),
+      new Tuple2<String, String>("new york", "rangers"),
+      new Tuple2<String, String>("new york", "islanders")));
+
+  List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+    Arrays.asList(
+      new Tuple2<String, Integer>("california", 1),
+      new Tuple2<String, Integer>("california", 3),
+      new Tuple2<String, Integer>("new york", 4),
+      new Tuple2<String, Integer>("new york", 1)),
+    Arrays.asList(
+      new Tuple2<String, Integer>("california", 5),
+      new Tuple2<String, Integer>("california", 5),
+      new Tuple2<String, Integer>("new york", 3),
+      new Tuple2<String, Integer>("new york", 1)));
+
+  @Test
+  public void testPairMap() { // Maps pair -> pair of different type
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, String>(1, "california"),
+        new Tuple2<Integer, String>(3, "california"),
+        new Tuple2<Integer, String>(4, "new york"),
+        new Tuple2<Integer, String>(1, "new york")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(5, "california"),
+        new Tuple2<Integer, String>(5, "california"),
+        new Tuple2<Integer, String>(3, "new york"),
+        new Tuple2<Integer, String>(1, "new york")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairMapPartitions() { // Maps pair -> pair of different type
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, String>(1, "california"),
+        new Tuple2<Integer, String>(3, "california"),
+        new Tuple2<Integer, String>(4, "new york"),
+        new Tuple2<Integer, String>(1, "new york")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(5, "california"),
+        new Tuple2<Integer, String>(5, "california"),
+        new Tuple2<Integer, String>(3, "new york"),
+        new Tuple2<Integer, String>(1, "new york")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
+      LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+      while (in.hasNext()) {
+        Tuple2<String, Integer> next = in.next();
+        out.add(next.swap());
+      }
+      return out;
+    });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairMap2() { // Maps pair -> single
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(1, 3, 4, 1),
+      Arrays.asList(5, 5, 3, 1));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+    List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, Integer>("hi", 1),
+        new Tuple2<String, Integer>("ho", 2)),
+      Arrays.asList(
+        new Tuple2<String, Integer>("hi", 1),
+        new Tuple2<String, Integer>("ho", 2)));
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, String>(1, "h"),
+        new Tuple2<Integer, String>(1, "i"),
+        new Tuple2<Integer, String>(2, "h"),
+        new Tuple2<Integer, String>(2, "o")),
+      Arrays.asList(
+        new Tuple2<Integer, String>(1, "h"),
+        new Tuple2<Integer, String>(1, "i"),
+        new Tuple2<Integer, String>(2, "h"),
+        new Tuple2<Integer, String>(2, "o")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
+      List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+      for (Character s : in._1().toCharArray()) {
+        out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+      }
+      return out;
+    });
+
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairReduceByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(
+        new Tuple2<String, Integer>("california", 10),
+        new Tuple2<String, Integer>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
+
+    JavaTestUtils.attachTestOutputStream(reduced);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testCombineByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(
+        new Tuple2<String, Integer>("california", 10),
+        new Tuple2<String, Integer>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
+      (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
+
+    JavaTestUtils.attachTestOutputStream(combined);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduceByKeyAndWindow() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 14),
+        new Tuple2<String, Integer>("new york", 9)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 10),
+        new Tuple2<String, Integer>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduceWindowed =
+      pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(reduceWindowed);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testUpdateStateByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 14),
+        new Tuple2<String, Integer>("new york", 9)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 14),
+        new Tuple2<String, Integer>("new york", 9)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
+      int out = 0;
+      if (state.isPresent()) {
+        out = out + state.get();
+      }
+      for (Integer v : values) {
+        out = out + v;
+      }
+      return Optional.of(out);
+    });
+
+    JavaTestUtils.attachTestOutputStream(updated);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduceByKeyAndWindowWithInverse() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, Integer>("california", 4),
+        new Tuple2<String, Integer>("new york", 5)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 14),
+        new Tuple2<String, Integer>("new york", 9)),
+      Arrays.asList(new Tuple2<String, Integer>("california", 10),
+        new Tuple2<String, Integer>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduceWindowed =
+      pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
+        new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(reduceWindowed);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairTransform() {
+    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(1, 5),
+        new Tuple2<Integer, Integer>(4, 5),
+        new Tuple2<Integer, Integer>(2, 5)),
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(2, 5),
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(4, 5),
+        new Tuple2<Integer, Integer>(1, 5)));
+
+    List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(1, 5),
+        new Tuple2<Integer, Integer>(2, 5),
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(4, 5)),
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(1, 5),
+        new Tuple2<Integer, Integer>(2, 5),
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(4, 5)));
+
+    JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
+
+    JavaTestUtils.attachTestOutputStream(sorted);
+    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairToNormalRDDTransform() {
+    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(1, 5),
+        new Tuple2<Integer, Integer>(4, 5),
+        new Tuple2<Integer, Integer>(2, 5)),
+      Arrays.asList(
+        new Tuple2<Integer, Integer>(2, 5),
+        new Tuple2<Integer, Integer>(3, 5),
+        new Tuple2<Integer, Integer>(4, 5),
+        new Tuple2<Integer, Integer>(1, 5)));
+
+    List<List<Integer>> expected = Arrays.asList(
+      Arrays.asList(3, 1, 4, 2),
+      Arrays.asList(2, 3, 4, 1));
+
+    JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
+    JavaTestUtils.attachTestOutputStream(firstParts);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testMapValues() {
+    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+    List<List<Tuple2<String, String>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
+        new Tuple2<String, String>("california", "GIANTS"),
+        new Tuple2<String, String>("new york", "YANKEES"),
+        new Tuple2<String, String>("new york", "METS")),
+      Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
+        new Tuple2<String, String>("california", "DUCKS"),
+        new Tuple2<String, String>("new york", "RANGERS"),
+        new Tuple2<String, String>("new york", "ISLANDERS")));
+
+    JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase());
+    JavaTestUtils.attachTestOutputStream(mapped);
+    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testFlatMapValues() {
+    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+    List<List<Tuple2<String, String>>> expected = Arrays.asList(
+      Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
+        new Tuple2<String, String>("california", "dodgers2"),
+        new Tuple2<String, String>("california", "giants1"),
+        new Tuple2<String, String>("california", "giants2"),
+        new Tuple2<String, String>("new york", "yankees1"),
+        new Tuple2<String, String>("new york", "yankees2"),
+        new Tuple2<String, String>("new york", "mets1"),
+        new Tuple2<String, String>("new york", "mets2")),
+      Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
+        new Tuple2<String, String>("california", "sharks2"),
+        new Tuple2<String, String>("california", "ducks1"),
+        new Tuple2<String, String>("california", "ducks2"),
+        new Tuple2<String, String>("new york", "rangers1"),
+        new Tuple2<String, String>("new york", "rangers2"),
+        new Tuple2<String, String>("new york", "islanders1"),
+        new Tuple2<String, String>("new york", "islanders2")));
+
+    JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+      ssc, inputData, 1);
+    JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+
+    JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
+      List<String> out = new ArrayList<String>();
+      out.add(in + "1");
+      out.add(in + "2");
+      return out;
+    });
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+    Assert.assertEquals(expected, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/extras/java8-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/resources/log4j.properties b/extras/java8-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..180beaa
--- /dev/null
+++ b/extras/java8-tests/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e28d7c..c59fada 100644
--- a/pom.xml
+++ b/pom.xml
@@ -711,6 +711,31 @@
       </modules>
 
     </profile>
+    <profile>
+      <id>java8-tests</id>
+      <build>
+        <plugins>
+          <!-- Needed for publishing test jars as it is needed by java8-tests -->
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+            <version>2.4</version>
+            <executions>
+              <execution>
+                <goals>
+                  <goal>test-jar</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+
+      <modules>
+        <module>extras/java8-tests</module>
+      </modules>
+
+    </profile>
 
     <profile>
       <id>yarn</id>

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index d45f677..aa17848 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -90,6 +90,14 @@ object SparkBuild extends Build {
   }
   lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client"
   val maybeAvro = if (hadoopVersion.startsWith("0.23.") && isYarnEnabled) Seq("org.apache.avro" % "avro" % "1.7.4") else Seq()
+
+  // Conditionally include the java 8 sub-project
+  lazy val javaVersion = System.getProperty("java.specification.version")
+  lazy val isJava8Enabled = javaVersion.toDouble >= "1.8".toDouble
+  val maybeJava8Tests = if (isJava8Enabled) Seq[ProjectReference](java8Tests) else Seq[ProjectReference]()
+  lazy val java8Tests = Project("java8-tests", file("extras/java8-tests"), settings = java8TestsSettings).
+    dependsOn(core) dependsOn(streaming % "compile->compile;test->test")
+
   // Conditionally include the yarn sub-project
   lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
   lazy val yarn = Project("yarn", file("yarn/stable"), settings = yarnSettings) dependsOn(core)
@@ -118,10 +126,11 @@ object SparkBuild extends Build {
   lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
     .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
 
-  // Everything except assembly, tools and examples belong to packageProjects
+  // Everything except assembly, tools, java8Tests and examples belong to packageProjects
   lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef
 
-  lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
+  lazy val allProjects = packageProjects ++ allExternalRefs ++
+    Seq[ProjectReference](examples, tools, assemblyProj) ++ maybeJava8Tests
 
   def sharedSettings = Defaults.defaultSettings ++ Seq(
     organization       := "org.apache.spark",
@@ -132,6 +141,7 @@ object SparkBuild extends Build {
     javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
     unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
     retrieveManaged := true,
+    javaHome := Properties.envOrNone("JAVA_HOME").map(file),
     // This is to add convenience of enabling sbt -Dsbt.offline=true for making the build offline.
     offline := "true".equalsIgnoreCase(sys.props("sbt.offline")),
     retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
@@ -370,6 +380,12 @@ object SparkBuild extends Build {
     name := "spark-yarn"
   )
 
+  def java8TestsSettings = sharedSettings ++ Seq(
+    name := "java8-tests",
+    javacOptions := Seq("-target", "1.8", "-source", "1.8"),
+    testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
+  )
+
   // Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
   // if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
   def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/sbt/sbt-launch-lib.bash
----------------------------------------------------------------------
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index d65bbdc..00a6b41 100755
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -16,7 +16,14 @@ declare -a residual_args
 declare -a java_args
 declare -a scalac_args
 declare -a sbt_commands
-declare java_cmd=java
+
+if test -x "$JAVA_HOME/bin/java"; then
+    echo -e "Using $JAVA_HOME as default JAVA_HOME."
+    echo "Note, this will be overridden by -java-home if it is set."
+    declare java_cmd="$JAVA_HOME/bin/java"
+else
+    declare java_cmd=java
+fi
 
 echoerr () {
   echo 1>&2 "$@"
@@ -131,7 +138,7 @@ process_args () {
 
        -sbt-jar) require_arg path "$1" "$2" && sbt_jar="$2" && shift 2 ;;
    -sbt-version) require_arg version "$1" "$2" && sbt_version="$2" && shift 2 ;;
-     -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && shift 2 ;;
+     -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && export JAVA_HOME=$2 && shift 2 ;;
 
             -D*) addJava "$1" && shift ;;
             -J*) addJava "${1:2}" && shift ;;

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index e23b725..721d502 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -41,7 +41,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]
 
   /** Return a new DStream containing only the elements that satisfy a predicate. */
   def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
-    dstream.filter((x => f(x).booleanValue()))
+    dstream.filter((x => f.call(x).booleanValue()))
 
   /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
   def cache(): JavaDStream[T] = dstream.cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 7aa7ead..a85cd04 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -17,19 +17,20 @@
 
 package org.apache.spark.streaming.api.java
 
-import java.util.{List => JList}
+import java.util
 import java.lang.{Long => JLong}
+import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-import org.apache.spark.streaming._
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
-import java.util
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _}
 import org.apache.spark.rdd.RDD
-import JavaDStream._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.api.java.JavaDStream._
 import org.apache.spark.streaming.dstream.DStream
 
 trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
@@ -123,23 +124,23 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
    * an array.
    */
-  def glom(): JavaDStream[JList[T]] = {
+  def glom(): JavaDStream[JList[T]] =
     new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
-  }
+
 
 
   /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
-  def context(): StreamingContext = dstream.context()
+  def context(): StreamingContext = dstream.context
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
   def map[R](f: JFunction[T, R]): JavaDStream[R] = {
-    new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+    new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag)
   }
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
-  def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
-    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
-    new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
+  def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+    def cm: ClassTag[(K2, V2)] = fakeClassTag
+    new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
@@ -148,19 +149,19 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
+    def fn = (x: T) => f.call(x).asScala
+    new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
    * Return a new DStream by applying a function to all elements of this DStream,
    * and then flattening the results
    */
-  def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+  def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
-    new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
+    def fn = (x: T) => f.call(x).asScala
+    def cm: ClassTag[(K2, V2)] = fakeClassTag
+    new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
     /**
@@ -169,8 +170,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * of the RDD.
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
@@ -178,10 +179,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
    * of the RDD.
    */
-  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
   : JavaPairDStream[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
@@ -283,8 +284,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * on each RDD of 'this' DStream.
    */
   def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
-    implicit val cm: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+    implicit val cm: ClassTag[U] = fakeClassTag
+
     def scalaTransform (in: RDD[T]): RDD[U] =
       transformFunc.call(wrapRDD(in)).rdd
     dstream.transform(scalaTransform(_))
@@ -295,8 +296,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * on each RDD of 'this' DStream.
    */
   def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
-    implicit val cm: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+    implicit val cm: ClassTag[U] = fakeClassTag
+
     def scalaTransform (in: RDD[T], time: Time): RDD[U] =
       transformFunc.call(wrapRDD(in), time).rdd
     dstream.transform(scalaTransform(_, _))
@@ -306,12 +307,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream.
    */
-  def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
+  def transformToPair[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
   JavaPairDStream[K2, V2] = {
-    implicit val cmk: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+    implicit val cmk: ClassTag[K2] = fakeClassTag
+    implicit val cmv: ClassTag[V2] = fakeClassTag
+
     def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
       transformFunc.call(wrapRDD(in)).rdd
     dstream.transform(scalaTransform(_))
@@ -321,12 +321,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream.
    */
-  def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
+  def transformToPair[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
   JavaPairDStream[K2, V2] = {
-    implicit val cmk: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+    implicit val cmk: ClassTag[K2] = fakeClassTag
+    implicit val cmv: ClassTag[V2] = fakeClassTag
+
     def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
       transformFunc.call(wrapRDD(in), time).rdd
     dstream.transform(scalaTransform(_, _))
@@ -340,10 +339,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
       other: JavaDStream[U],
       transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
     ): JavaDStream[W] = {
-    implicit val cmu: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
-    implicit val cmv: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cmu: ClassTag[U] = fakeClassTag
+    implicit val cmv: ClassTag[W] = fakeClassTag
+
     def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
     dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
@@ -353,16 +351,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream and 'other' DStream.
    */
-  def transformWith[U, K2, V2](
+  def transformWithToPair[U, K2, V2](
       other: JavaDStream[U],
       transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
     ): JavaPairDStream[K2, V2] = {
-    implicit val cmu: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
-    implicit val cmk2: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv2: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+    implicit val cmu: ClassTag[U] = fakeClassTag
+    implicit val cmk2: ClassTag[K2] = fakeClassTag
+    implicit val cmv2: ClassTag[V2] = fakeClassTag
     def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
     dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
@@ -376,12 +371,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
       other: JavaPairDStream[K2, V2],
       transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
     ): JavaDStream[W] = {
-    implicit val cmk2: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv2: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
-    implicit val cmw: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cmk2: ClassTag[K2] = fakeClassTag
+    implicit val cmv2: ClassTag[V2] = fakeClassTag
+    implicit val cmw: ClassTag[W] = fakeClassTag
+
     def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
     dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
@@ -391,18 +384,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream and 'other' DStream.
    */
-  def transformWith[K2, V2, K3, V3](
+  def transformWithToPair[K2, V2, K3, V3](
       other: JavaPairDStream[K2, V2],
       transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
     ): JavaPairDStream[K3, V3] = {
-    implicit val cmk2: ClassTag[K2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
-    implicit val cmv2: ClassTag[V2] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
-    implicit val cmk3: ClassTag[K3] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
-    implicit val cmv3: ClassTag[V3] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
+    implicit val cmk2: ClassTag[K2] = fakeClassTag
+    implicit val cmv2: ClassTag[V2] = fakeClassTag
+    implicit val cmk3: ClassTag[K3] = fakeClassTag
+    implicit val cmv3: ClassTag[V3] = fakeClassTag
     def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
     dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))


[3/3] git commit: [java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs

Posted by pw...@apache.org.
[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs

Author: Prashant Sharma <pr...@imaginea.com>
Author: Patrick Wendell <pw...@gmail.com>

Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits:

95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch.
85a954e [Prashant Sharma] Nit. import orderings.
673f7ac [Prashant Sharma] Added support for -java-home as well
80a13e8 [Prashant Sharma] Used fake class tag syntax
26eb3f6 [Prashant Sharma] Patrick's comments on PR.
35d8d79 [Prashant Sharma] Specified java 8 building in the docs
31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag.
4ab87d3 [Prashant Sharma] Review feedback on the pr
c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/181ec503
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/181ec503
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/181ec503

Branch: refs/heads/master
Commit: 181ec5030792a10f3ce77e997d0e2eda9bcd6139
Parents: b14ede7
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Mon Mar 3 22:31:30 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Mar 3 22:31:30 2014 -0800

----------------------------------------------------------------------
 .../java/function/DoubleFlatMapFunction.java    |  27 +
 .../spark/api/java/function/DoubleFunction.java |  27 +
 .../api/java/function/FlatMapFunction.java      |  27 +
 .../api/java/function/FlatMapFunction2.java     |  27 +
 .../spark/api/java/function/Function.java       |  29 +
 .../spark/api/java/function/Function2.java      |  27 +
 .../spark/api/java/function/Function3.java      |  27 +
 .../api/java/function/PairFlatMapFunction.java  |  30 +
 .../spark/api/java/function/PairFunction.java   |  29 +
 .../spark/api/java/function/VoidFunction.java   |  27 +
 .../apache/spark/api/java/JavaDoubleRDD.scala   |   2 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |  67 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |   2 +-
 .../org/apache/spark/api/java/JavaRDDLike.scala |  82 +-
 .../java/function/DoubleFlatMapFunction.scala   |  30 -
 .../api/java/function/DoubleFunction.scala      |  29 -
 .../api/java/function/FlatMapFunction.scala     |  27 -
 .../api/java/function/FlatMapFunction2.scala    |  27 -
 .../spark/api/java/function/Function.scala      |  31 -
 .../spark/api/java/function/Function2.scala     |  29 -
 .../spark/api/java/function/Function3.scala     |  28 -
 .../api/java/function/PairFlatMapFunction.scala |  36 -
 .../spark/api/java/function/PairFunction.scala  |  33 -
 .../spark/api/java/function/VoidFunction.scala  |  33 -
 .../api/java/function/WrappedFunction1.scala    |  32 -
 .../api/java/function/WrappedFunction2.scala    |  32 -
 .../api/java/function/WrappedFunction3.scala    |  34 -
 .../java/org/apache/spark/JavaAPISuite.java     |  38 +-
 dev/run-tests                                   |  10 +
 docs/building-with-maven.md                     |  12 +
 docs/java-programming-guide.md                  |  56 +-
 .../org/apache/spark/examples/JavaHdfsLR.java   |   6 +-
 .../org/apache/spark/examples/JavaKMeans.java   |   2 +-
 .../org/apache/spark/examples/JavaLogQuery.java |   2 +-
 .../org/apache/spark/examples/JavaPageRank.java |   6 +-
 .../java/org/apache/spark/examples/JavaTC.java  |   6 +-
 .../apache/spark/examples/JavaWordCount.java    |   2 +-
 .../apache/spark/mllib/examples/JavaALS.java    |   4 +-
 .../apache/spark/mllib/examples/JavaKMeans.java |   2 +-
 .../org/apache/spark/mllib/examples/JavaLR.java |   2 +-
 .../streaming/examples/JavaKafkaWordCount.java  |   2 +-
 .../examples/JavaNetworkWordCount.java          |   2 +-
 .../streaming/examples/JavaQueueStream.java     |   2 +-
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |   6 +-
 extras/README.md                                |   1 +
 extras/java8-tests/README.md                    |  24 +
 extras/java8-tests/pom.xml                      | 151 ++++
 .../java/org/apache/spark/Java8APISuite.java    | 391 +++++++++
 .../apache/spark/streaming/Java8APISuite.java   | 841 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  28 +
 pom.xml                                         |  25 +
 project/SparkBuild.scala                        |  20 +-
 sbt/sbt-launch-lib.bash                         |  11 +-
 .../spark/streaming/api/java/JavaDStream.scala  |   2 +-
 .../streaming/api/java/JavaDStreamLike.scala    | 119 ++-
 .../streaming/api/java/JavaPairDStream.scala    | 101 +--
 .../api/java/JavaStreamingContext.scala         |   4 +-
 .../apache/spark/streaming/JavaAPISuite.java    |  62 +-
 58 files changed, 2083 insertions(+), 688 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
new file mode 100644
index 0000000..57fd0a7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns zero or more records of type Double from each input record.
+ */
+public interface DoubleFlatMapFunction<T> extends Serializable {
+  public Iterable<Double> call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
new file mode 100644
index 0000000..150144e
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ *  A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
+public interface DoubleFunction<T> extends Serializable {
+  public double call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
new file mode 100644
index 0000000..fa75842
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns zero or more output records from each input record.
+ */
+public interface FlatMapFunction<T, R> extends Serializable {
+  public Iterable<R> call(T t) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
new file mode 100644
index 0000000..d1fdec0
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that takes two inputs and returns zero or more output records.
+ */
+public interface FlatMapFunction2<T1, T2, R> extends Serializable {
+  public Iterable<R> call(T1 t1, T2 t2) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java
new file mode 100644
index 0000000..d00551b
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for functions whose return types do not create special RDDs. PairFunction and
+ * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
+ * when mapping RDDs of other types.
+ */
+public interface Function<T1, R> extends Serializable {
+  public R call(T1 v1) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
new file mode 100644
index 0000000..793caaa
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A two-argument function that takes arguments of type T1 and T2 and returns an R.
+ */
+public interface Function2<T1, T2, R> extends Serializable {
+  public R call(T1 v1, T2 v2) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/Function3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
new file mode 100644
index 0000000..b4151c3
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
+ */
+public interface Function3<T1, T2, T3, R> extends Serializable {
+  public R call(T1 v1, T2 v2, T3 v3) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
new file mode 100644
index 0000000..691ef2e
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
+public interface PairFlatMapFunction<T, K, V> extends Serializable {
+  public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
new file mode 100644
index 0000000..abd9bcc
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
+public interface PairFunction<T, K, V> extends Serializable {
+  public Tuple2<K, V> call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
new file mode 100644
index 0000000..2a10435
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function with no return value.
+ */
+public interface VoidFunction<T> extends Serializable {
+  public void call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 0710444..d178706 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -83,7 +83,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
    * Return a new RDD containing only the elements that satisfy a predicate.
    */
   def filter(f: JFunction[JDouble, java.lang.Boolean]): JavaDoubleRDD =
-    fromRDD(srdd.filter(x => f(x).booleanValue()))
+    fromRDD(srdd.filter(x => f.call(x).booleanValue()))
 
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 3f67290..857626f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{HashPartitioner, Partitioner}
 import org.apache.spark.Partitioner._
 import org.apache.spark.SparkContext.rddToPairRDDFunctions
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
 import org.apache.spark.storage.StorageLevel
@@ -89,7 +89,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Return a new RDD containing only the elements that satisfy a predicate.
    */
   def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
+    new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue()))
 
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.
@@ -165,9 +165,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Simplified version of combineByKey that hash-partitions the output RDD.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
-    mergeValue: JFunction2[C, V, C],
-    mergeCombiners: JFunction2[C, C, C],
-    numPartitions: Int): JavaPairRDD[K, C] =
+      mergeValue: JFunction2[C, V, C],
+      mergeCombiners: JFunction2[C, C, C],
+      numPartitions: Int): JavaPairRDD[K, C] =
     combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
 
   /**
@@ -442,7 +442,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    */
   def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: V) => f.apply(x).asScala
+    def fn = (x: V) => f.call(x).asScala
     implicit val ctag: ClassTag[U] = fakeClassTag
     fromRDD(rdd.flatMapValues(fn))
   }
@@ -511,49 +511,49 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    conf: JobConf) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F],
+      conf: JobConf) {
     rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
   }
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F]) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F]) {
     rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
   }
 
   /** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
   def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    codec: Class[_ <: CompressionCodec]) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F],
+      codec: Class[_ <: CompressionCodec]) {
     rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
   }
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    conf: Configuration) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F],
+      conf: Configuration) {
     rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
   }
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F]) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F]) {
     rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
   }
 
@@ -700,6 +700,15 @@ object JavaPairRDD {
 
   implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
 
+  private[spark]
+  implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
+    (x: T1, x1: T2) => fun.call(x, x1)
+  }
+
+  private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x)
+
+  private[spark]
+  implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y)
 
   /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
   def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index d7ce8fd..e973c46 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -70,7 +70,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
    * Return a new RDD containing only the elements that satisfy a predicate.
    */
   def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
-    wrapRDD(rdd.filter((x => f(x).booleanValue())))
+    wrapRDD(rdd.filter((x => f.call(x).booleanValue())))
 
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 729668f..af0114b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -67,7 +67,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to all elements of this RDD.
    */
   def map[R](f: JFunction[T, R]): JavaRDD[R] =
-    new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
+    new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)
 
   /**
    * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
@@ -82,15 +82,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
-  def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
-    new JavaDoubleRDD(rdd.map(x => f(x).doubleValue()))
+  def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
+    new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue()))
+  }
 
   /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
-  def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
-    new JavaPairRDD(rdd.map(f)(ctag))(f.keyType(), f.valueType())
+  def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
+    def cm = implicitly[ClassTag[(K2, V2)]]
+    new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
@@ -99,17 +100,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType())
+    def fn = (x: T) => f.call(x).asScala
+    JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
    *  Return a new RDD by first applying a function to all elements of this
    *  RDD, and then flattening the results.
    */
-  def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
+  def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
+    def fn = (x: T) => f.call(x).asScala
     new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
   }
 
@@ -117,19 +118,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    *  Return a new RDD by first applying a function to all elements of this
    *  RDD, and then flattening the results.
    */
-  def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
+  def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
-    JavaPairRDD.fromRDD(rdd.flatMap(fn)(ctag))(f.keyType(), f.valueType())
+    def fn = (x: T) => f.call(x).asScala
+    def cm = implicitly[ClassTag[(K2, V2)]]
+    JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
@@ -137,52 +138,53 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
       preservesPartitioning: Boolean): JavaRDD[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    JavaRDD.fromRDD(
+      rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
-    * Return a new RDD by applying a function to each partition of this RDD.
+   * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+  def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
     new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
   }
 
   /**
-    * Return a new RDD by applying a function to each partition of this RDD.
+   * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
   JavaPairRDD[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
-
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]],
-    preservesPartitioning: Boolean): JavaDoubleRDD = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+  def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+      preservesPartitioning: Boolean): JavaDoubleRDD = {
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
     new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
-      .map((x: java.lang.Double) => x.doubleValue()))
+      .map(x => x.doubleValue()))
   }
 
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
       preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    JavaPairRDD.fromRDD(
+      rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
-    rdd.foreachPartition((x => f(asJavaIterator(x))))
+    rdd.foreachPartition((x => f.call(asJavaIterator(x))))
   }
 
   /**
@@ -205,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
     implicit val ctagK: ClassTag[K] = fakeClassTag
     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))
+    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
   }
 
   /**
@@ -215,7 +217,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
     implicit val ctagK: ClassTag[K] = fakeClassTag
     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))
+    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
   }
 
   /**
@@ -255,9 +257,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
       other: JavaRDDLike[U, _],
       f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
     def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
-      f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
+      f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
     JavaRDD.fromRDD(
-      rdd.zipPartitions(other.rdd)(fn)(other.classTag, f.elementType()))(f.elementType())
+      rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
   }
 
   // Actions (launch a job to return a value to the user program)
@@ -266,7 +268,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Applies a function f to all elements of this RDD.
    */
   def foreach(f: VoidFunction[T]) {
-    val cleanF = rdd.context.clean(f)
+    val cleanF = rdd.context.clean((x: T) => f.call(x))
     rdd.foreach(cleanF)
   }
 
@@ -320,7 +322,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
     combOp: JFunction2[U, U, U]): U =
-    rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType)
+    rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U])
 
   /**
    * Return the number of elements in the RDD.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
deleted file mode 100644
index 7500a89..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Double => JDouble, Iterable => JIterable}
-
-/**
- * A function that returns zero or more records of type Double from each input record.
- */
-// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
-abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]]
-   with Serializable {
-   // Intentionally left blank
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
deleted file mode 100644
index 2cdf2e9..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Double => JDouble}
-
-/**
- * A function that returns Doubles, and can be used to construct DoubleRDDs.
- */
-// DoubleFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and DoubleFunction.
-abstract class DoubleFunction[T] extends WrappedFunction1[T, JDouble] with Serializable {
-    // Intentionally left blank
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
deleted file mode 100644
index bdb01f7..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-
-/**
- * A function that returns zero or more output records from each input record.
- */
-abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
-  def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
deleted file mode 100644
index aae1349..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-
-/**
- * A function that takes two inputs and returns zero or more output records.
- */
-abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
-  def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
deleted file mode 100644
index a5e1701..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * Base class for functions whose return types do not create special RDDs. PairFunction and
- * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
- * when mapping RDDs of other types.
- */
-abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable {
-  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
deleted file mode 100644
index fa3616c..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * A two-argument function that takes arguments of type T1 and T2 and returns an R.
- */
-abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
-  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
deleted file mode 100644
index 4515289..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import org.apache.spark.api.java.JavaSparkContext
-import scala.reflect.ClassTag
-
-/**
- * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
- */
-abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
-  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
deleted file mode 100644
index 8467bbb..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Iterable => JIterable}
-import org.apache.spark.api.java.JavaSparkContext
-import scala.reflect.ClassTag
-
-/**
- * A function that returns zero or more key-value pair records from each input record. The
- * key-value pairs are represented as scala.Tuple2 objects.
- */
-// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and PairFlatMapFunction.
-abstract class PairFlatMapFunction[T, K, V] extends WrappedFunction1[T, JIterable[(K, V)]]
-  with Serializable {
-
-  def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
-
-  def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
deleted file mode 100644
index d0ba0b6..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
- */
-// PairFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and PairFunction.
-abstract class PairFunction[T, K, V] extends WrappedFunction1[T, (K, V)] with Serializable {
-
-  def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
-
-  def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
deleted file mode 100644
index ea94313..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-/**
- * A function with no return value.
- */
-// This allows Java users to write void methods without having to return Unit.
-abstract class VoidFunction[T] extends Serializable {
-  @throws(classOf[Exception])
-  def call(t: T) : Unit
-}
-
-// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
-// return Unit), so it is implicitly converted to a Function1[T, Unit]:
-object VoidFunction {
-  implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
deleted file mode 100644
index cfe694f..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction1
-
-/**
- * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
-  @throws(classOf[Exception])
-  def call(t: T): R
-
-  final def apply(t: T): R = call(t)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
deleted file mode 100644
index eb9277c..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction2
-
-/**
- * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
-  @throws(classOf[Exception])
-  def call(t1: T1, t2: T2): R
-
-  final def apply(t1: T1, t2: T2): R = call(t1, t2)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
deleted file mode 100644
index d314dbd..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction3
-
-/**
- * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
-  extends AbstractFunction3[T1, T2, T3, R] {
-  @throws(classOf[Exception])
-  def call(t1: T1, t2: T2, t3: T3): R
-
-  final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3)
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index aa5079c..c7d0e2d 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -386,14 +386,14 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void map() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+    JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
       @Override
-      public Double call(Integer x) {
+      public double call(Integer x) {
         return 1.0 * x;
       }
     }).cache();
     doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(new PairFunction<Integer, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Integer x) {
         return new Tuple2<Integer, Integer>(x, x);
@@ -422,7 +422,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals("Hello", words.first());
     Assert.assertEquals(11, words.count());
 
-    JavaPairRDD<String, String> pairs = rdd.flatMap(
+    JavaPairRDD<String, String> pairs = rdd.flatMapToPair(
       new PairFlatMapFunction<String, String, String>() {
 
         @Override
@@ -436,7 +436,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
     Assert.assertEquals(11, pairs.count());
 
-    JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
+    JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
       @Override
       public Iterable<Double> call(String s) {
         List<Double> lengths = new LinkedList<Double>();
@@ -459,7 +459,7 @@ public class JavaAPISuite implements Serializable {
       JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
 
       // Regression test for SPARK-668:
-      JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
+      JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
           new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
           @Override
           public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
@@ -469,7 +469,7 @@ public class JavaAPISuite implements Serializable {
       swapped.collect();
 
       // There was never a bug here, but it's worth testing:
-      pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+      pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
           @Override
           public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
               return item.swap();
@@ -592,7 +592,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -601,7 +601,7 @@ public class JavaAPISuite implements Serializable {
 
     // Try reading the output back as an object file
     JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
-      Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
+      Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
       @Override
       public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
         return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
@@ -622,7 +622,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -653,7 +653,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -713,7 +713,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -743,7 +743,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -766,9 +766,9 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void zip() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+    JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
       @Override
-      public Double call(Integer x) {
+      public double call(Integer x) {
         return 1.0 * x;
       }
     });
@@ -893,13 +893,13 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void mapOnPairRDD() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Integer i) throws Exception {
         return new Tuple2<Integer, Integer>(i, i % 2);
       }
     });
-    JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+    JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
         new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
@@ -919,7 +919,7 @@ public class JavaAPISuite implements Serializable {
   public void collectPartitions() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Integer i) throws Exception {
         return new Tuple2<Integer, Integer>(i, i % 2);
@@ -984,7 +984,7 @@ public class JavaAPISuite implements Serializable {
   public void collectAsMapWithIntArrayValues() {
     // Regression test for SPARK-1040
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
-    JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(new PairFunction<Integer, Integer, int[]>() {
       @Override
       public Tuple2<Integer, int[]> call(Integer x) throws Exception {
         return new Tuple2<Integer, int[]>(x, new int[] { x });

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/dev/run-tests
----------------------------------------------------------------------
diff --git a/dev/run-tests b/dev/run-tests
index d65a397..cf0b940 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -27,6 +27,16 @@ rm -rf ./work
 # Fail fast
 set -e
 
+if test -x "$JAVA_HOME/bin/java"; then
+    declare java_cmd="$JAVA_HOME/bin/java"
+else 
+    declare java_cmd=java
+fi
+
+JAVA_VERSION=$($java_cmd -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
+[ "$JAVA_VERSION" -ge 18 ] && echo "" || echo "[Warn] Java 8 tests will not run, because JDK version is < 1.8."
+
+
 echo "========================================================================="
 echo "Running Scala style checks"
 echo "========================================================================="

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/docs/building-with-maven.md
----------------------------------------------------------------------
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index ded1292..a982c4d 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -25,6 +25,8 @@ If you don't run this, you may see errors like the following:
 
 You can fix this by setting the `MAVEN_OPTS` variable as discussed before.
 
+*Note: For Java 1.8 and above this step is not required.*
+
 ## Specifying the Hadoop version ##
 
 Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the "hadoop.version" property. If unset, Spark will build against Hadoop 1.0.4 by default.
@@ -76,3 +78,13 @@ The maven build includes support for building a Debian package containing the as
     $ mvn -Pdeb -DskipTests clean package
 
 The debian package can then be found under assembly/target. We added the short commit hash to the file name so that we can distinguish individual packages built for SNAPSHOT versions.
+
+## Running java 8 test suites.
+
+Running only java 8 tests and nothing else.
+
+    $ mvn install -DskipTests -Pjava8-tests
+    
+Java 8 tests are run when -Pjava8-tests profile is enabled, they will run in spite of -DskipTests. 
+For these tests to run your system must have a JDK 8 installation. 
+If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/docs/java-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md
index 5c73dbb..6632360 100644
--- a/docs/java-programming-guide.md
+++ b/docs/java-programming-guide.md
@@ -21,15 +21,21 @@ operations (e.g. map) and handling RDDs of different types, as discussed next.
 
 There are a few key differences between the Java and Scala APIs:
 
-* Java does not support anonymous or first-class functions, so functions must
-  be implemented by extending the
+* Java does not support anonymous or first-class functions, so functions are passed
+  using anonymous classes that implement the
   [`org.apache.spark.api.java.function.Function`](api/core/index.html#org.apache.spark.api.java.function.Function),
   [`Function2`](api/core/index.html#org.apache.spark.api.java.function.Function2), etc.
-  classes.
+  interfaces.
 * To maintain type safety, the Java API defines specialized Function and RDD
   classes for key-value pairs and doubles. For example, 
   [`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD)
   stores key-value pairs.
+* Some methods are defined on the basis of the passed anonymous function's 
+  (a.k.a lambda expression) return type, 
+  for example mapToPair(...) or flatMapToPair returns
+  [`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD),
+  similarly mapToDouble and flatMapToDouble returns
+  [`JavaDoubleRDD`](api/core/index.html#org.apache.spark.api.java.JavaDoubleRDD).
 * RDD methods like `collect()` and `countByKey()` return Java collections types,
   such as `java.util.List` and `java.util.Map`.
 * Key-value pairs, which are simply written as `(key, value)` in Scala, are represented
@@ -53,10 +59,10 @@ each specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`,
 etc (this acheives the "same-result-type" principle used by the [Scala collections
 framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html)).
 
-## Function Classes
+## Function Interfaces
 
-The following table lists the function classes used by the Java API.  Each
-class has a single abstract method, `call()`, that must be implemented.
+The following table lists the function interfaces used by the Java API.  Each
+interface has a single abstract method, `call()`, that must be implemented.
 
 <table class="table">
 <tr><th>Class</th><th>Function Type</th></tr>
@@ -78,7 +84,6 @@ RDD [storage level](scala-programming-guide.html#rdd-persistence) constants, suc
 declared in the [org.apache.spark.api.java.StorageLevels](api/core/index.html#org.apache.spark.api.java.StorageLevels) class. To
 define your own storage level, you can use StorageLevels.create(...). 
 
-
 # Other Features
 
 The Java API supports other Spark features, including
@@ -86,6 +91,21 @@ The Java API supports other Spark features, including
 [broadcast variables](scala-programming-guide.html#broadcast-variables), and
 [caching](scala-programming-guide.html#rdd-persistence).
 
+# Upgrading From Pre-1.0 Versions of Spark
+
+In version 1.0 of Spark the Java API was refactored to better support Java 8
+lambda expressions. Users upgrading from older versions of Spark should note
+the following changes:
+
+* All `org.apache.spark.api.java.function.*` have been changed from abstract
+  classes to interfaces. This means that concrete implementations of these 
+  `Function` classes will need to use `implements` rather than `extends`.
+* Certain transformation functions now have multiple versions depending
+  on the return type. In Spark core, the map functions (map, flatMap,
+  mapPartitons) have type-specific versions, e.g. 
+  [`mapToPair`](api/core/index.html#org.apache.spark.api.java.JavaRDD@mapToPair[K2,V2](f:org.apache.spark.api.java.function.PairFunction[T,K2,V2]):org.apache.spark.api.java.JavaPairRDD[K2,V2])
+  and [`mapToDouble`](api/core/index.html#org.apache.spark.api.java.JavaRDD@mapToDouble[R](f:org.apache.spark.api.java.function.DoubleFunction[T]):org.apache.spark.api.java.JavaDoubleRDD).
+  Spark Streaming also uses the same approach, e.g. [`transformToPair`](api/streaming/index.html#org.apache.spark.streaming.api.java.JavaDStream@transformToPair[K2,V2](transformFunc:org.apache.spark.api.java.function.Function[R,org.apache.spark.api.java.JavaPairRDD[K2,V2]]):org.apache.spark.streaming.api.java.JavaPairDStream[K2,V2]).
 
 # Example
 
@@ -127,11 +147,20 @@ class Split extends FlatMapFunction<String, String> {
 JavaRDD<String> words = lines.flatMap(new Split());
 {% endhighlight %}
 
+Java 8+ users can also write the above `FlatMapFunction` in a more concise way using 
+a lambda expression:
+
+{% highlight java %}
+JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(s.split(" ")));
+{% endhighlight %}
+
+This lambda syntax can be applied to all anonymous classes in Java 8.
+
 Continuing with the word count example, we map each word to a `(word, 1)` pair:
 
 {% highlight java %}
 import scala.Tuple2;
-JavaPairRDD<String, Integer> ones = words.map(
+JavaPairRDD<String, Integer> ones = words.mapToPair(
   new PairFunction<String, String, Integer>() {
     public Tuple2<String, Integer> call(String s) {
       return new Tuple2(s, 1);
@@ -140,7 +169,7 @@ JavaPairRDD<String, Integer> ones = words.map(
 );
 {% endhighlight %}
 
-Note that `map` was passed a `PairFunction<String, String, Integer>` and
+Note that `mapToPair` was passed a `PairFunction<String, String, Integer>` and
 returned a `JavaPairRDD<String, Integer>`.
 
 To finish the word count program, we will use `reduceByKey` to count the
@@ -164,7 +193,7 @@ possible to chain the RDD transformations, so the word count example could also
 be written as:
 
 {% highlight java %}
-JavaPairRDD<String, Integer> counts = lines.flatMap(
+JavaPairRDD<String, Integer> counts = lines.flatMapToPair(
     ...
   ).map(
     ...
@@ -180,10 +209,11 @@ just a matter of style.
 
 We currently provide documentation for the Java API as Scaladoc, in the
 [`org.apache.spark.api.java` package](api/core/index.html#org.apache.spark.api.java.package), because
-some of the classes are implemented in Scala. The main downside is that the types and function
+some of the classes are implemented in Scala. It is important to note that the types and function
 definitions show Scala syntax (for example, `def reduce(func: Function2[T, T]): T` instead of
-`T reduce(Function2<T, T> func)`). 
-We hope to generate documentation with Java-style syntax in the future.
+`T reduce(Function2<T, T> func)`). In addition, the Scala `trait` modifier is used for Java
+interface classes. We hope to generate documentation with Java-style syntax in the future to
+avoid these quirks.
 
 
 # Where to Go from Here

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index d552c47..6b49244 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -45,7 +45,7 @@ public final class JavaHdfsLR {
     double y;
   }
 
-  static class ParsePoint extends Function<String, DataPoint> {
+  static class ParsePoint implements Function<String, DataPoint> {
     private static final Pattern SPACE = Pattern.compile(" ");
 
     @Override
@@ -60,7 +60,7 @@ public final class JavaHdfsLR {
     }
   }
 
-  static class VectorSum extends Function2<double[], double[], double[]> {
+  static class VectorSum implements Function2<double[], double[], double[]> {
     @Override
     public double[] call(double[] a, double[] b) {
       double[] result = new double[D];
@@ -71,7 +71,7 @@ public final class JavaHdfsLR {
     }
   }
 
-  static class ComputeGradient extends Function<DataPoint, double[]> {
+  static class ComputeGradient implements Function<DataPoint, double[]> {
     private final double[] weights;
 
     ComputeGradient(double[] weights) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
index 0dc8792..2d79727 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
@@ -98,7 +98,7 @@ public final class JavaKMeans {
     double tempDist;
     do {
       // allocate each vector to closest centroid
-      JavaPairRDD<Integer, Vector> closest = data.map(
+      JavaPairRDD<Integer, Vector> closest = data.mapToPair(
         new PairFunction<Vector, Integer, Vector>() {
           @Override
           public Tuple2<Integer, Vector> call(Vector vector) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 9eb1cad..a518fe2 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -108,7 +108,7 @@ public final class JavaLogQuery {
 
     JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
 
-    JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
+    JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
       @Override
       public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
         return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index a84245b..e53925b 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -42,7 +42,7 @@ import java.util.regex.Pattern;
 public final class JavaPageRank {
   private static final Pattern SPACES = Pattern.compile("\\s+");
 
-  private static class Sum extends Function2<Double, Double, Double> {
+  private static class Sum implements Function2<Double, Double, Double> {
     @Override
     public Double call(Double a, Double b) {
       return a + b;
@@ -66,7 +66,7 @@ public final class JavaPageRank {
     JavaRDD<String> lines = ctx.textFile(args[1], 1);
 
     // Loads all URLs from input file and initialize their neighbors.
-    JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
+    JavaPairRDD<String, List<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
       @Override
       public Tuple2<String, String> call(String s) {
         String[] parts = SPACES.split(s);
@@ -86,7 +86,7 @@ public final class JavaPageRank {
     for (int current = 0; current < Integer.parseInt(args[2]); current++) {
       // Calculates URL contributions to the rank of other URLs.
       JavaPairRDD<String, Double> contribs = links.join(ranks).values()
-        .flatMap(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
+        .flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
           @Override
           public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
             List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaTC.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index 2ceb0fd..6cfe25c 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -50,7 +50,7 @@ public final class JavaTC {
     return new ArrayList<Tuple2<Integer, Integer>>(edges);
   }
 
-  static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
+  static class ProjectFn implements PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
       Integer, Integer> {
     static final ProjectFn INSTANCE = new ProjectFn();
 
@@ -77,7 +77,7 @@ public final class JavaTC {
     // the graph to obtain the path (x, z).
 
     // Because join() joins on keys, the edges are stored in reversed order.
-    JavaPairRDD<Integer, Integer> edges = tc.map(
+    JavaPairRDD<Integer, Integer> edges = tc.mapToPair(
       new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
         @Override
         public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
@@ -91,7 +91,7 @@ public final class JavaTC {
       oldCount = nextCount;
       // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
       // then project the result to obtain the new (x, z) paths.
-      tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct().cache();
+      tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache();
       nextCount = tc.count();
     } while (nextCount != oldCount);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 6651f98..fa1b977 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -49,7 +49,7 @@ public final class JavaWordCount {
       }
     });
     
-    JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
+    JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
       @Override
       public Tuple2<String, Integer> call(String s) {
         return new Tuple2<String, Integer>(s, 1);