You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/04 07:31:38 UTC

[3/3] git commit: [java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs

[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs

Author: Prashant Sharma <pr...@imaginea.com>
Author: Patrick Wendell <pw...@gmail.com>

Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits:

95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch.
85a954e [Prashant Sharma] Nit. import orderings.
673f7ac [Prashant Sharma] Added support for -java-home as well
80a13e8 [Prashant Sharma] Used fake class tag syntax
26eb3f6 [Prashant Sharma] Patrick's comments on PR.
35d8d79 [Prashant Sharma] Specified java 8 building in the docs
31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag.
4ab87d3 [Prashant Sharma] Review feedback on the pr
c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/181ec503
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/181ec503
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/181ec503

Branch: refs/heads/master
Commit: 181ec5030792a10f3ce77e997d0e2eda9bcd6139
Parents: b14ede7
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Mon Mar 3 22:31:30 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Mar 3 22:31:30 2014 -0800

----------------------------------------------------------------------
 .../java/function/DoubleFlatMapFunction.java    |  27 +
 .../spark/api/java/function/DoubleFunction.java |  27 +
 .../api/java/function/FlatMapFunction.java      |  27 +
 .../api/java/function/FlatMapFunction2.java     |  27 +
 .../spark/api/java/function/Function.java       |  29 +
 .../spark/api/java/function/Function2.java      |  27 +
 .../spark/api/java/function/Function3.java      |  27 +
 .../api/java/function/PairFlatMapFunction.java  |  30 +
 .../spark/api/java/function/PairFunction.java   |  29 +
 .../spark/api/java/function/VoidFunction.java   |  27 +
 .../apache/spark/api/java/JavaDoubleRDD.scala   |   2 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |  67 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |   2 +-
 .../org/apache/spark/api/java/JavaRDDLike.scala |  82 +-
 .../java/function/DoubleFlatMapFunction.scala   |  30 -
 .../api/java/function/DoubleFunction.scala      |  29 -
 .../api/java/function/FlatMapFunction.scala     |  27 -
 .../api/java/function/FlatMapFunction2.scala    |  27 -
 .../spark/api/java/function/Function.scala      |  31 -
 .../spark/api/java/function/Function2.scala     |  29 -
 .../spark/api/java/function/Function3.scala     |  28 -
 .../api/java/function/PairFlatMapFunction.scala |  36 -
 .../spark/api/java/function/PairFunction.scala  |  33 -
 .../spark/api/java/function/VoidFunction.scala  |  33 -
 .../api/java/function/WrappedFunction1.scala    |  32 -
 .../api/java/function/WrappedFunction2.scala    |  32 -
 .../api/java/function/WrappedFunction3.scala    |  34 -
 .../java/org/apache/spark/JavaAPISuite.java     |  38 +-
 dev/run-tests                                   |  10 +
 docs/building-with-maven.md                     |  12 +
 docs/java-programming-guide.md                  |  56 +-
 .../org/apache/spark/examples/JavaHdfsLR.java   |   6 +-
 .../org/apache/spark/examples/JavaKMeans.java   |   2 +-
 .../org/apache/spark/examples/JavaLogQuery.java |   2 +-
 .../org/apache/spark/examples/JavaPageRank.java |   6 +-
 .../java/org/apache/spark/examples/JavaTC.java  |   6 +-
 .../apache/spark/examples/JavaWordCount.java    |   2 +-
 .../apache/spark/mllib/examples/JavaALS.java    |   4 +-
 .../apache/spark/mllib/examples/JavaKMeans.java |   2 +-
 .../org/apache/spark/mllib/examples/JavaLR.java |   2 +-
 .../streaming/examples/JavaKafkaWordCount.java  |   2 +-
 .../examples/JavaNetworkWordCount.java          |   2 +-
 .../streaming/examples/JavaQueueStream.java     |   2 +-
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |   6 +-
 extras/README.md                                |   1 +
 extras/java8-tests/README.md                    |  24 +
 extras/java8-tests/pom.xml                      | 151 ++++
 .../java/org/apache/spark/Java8APISuite.java    | 391 +++++++++
 .../apache/spark/streaming/Java8APISuite.java   | 841 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  28 +
 pom.xml                                         |  25 +
 project/SparkBuild.scala                        |  20 +-
 sbt/sbt-launch-lib.bash                         |  11 +-
 .../spark/streaming/api/java/JavaDStream.scala  |   2 +-
 .../streaming/api/java/JavaDStreamLike.scala    | 119 ++-
 .../streaming/api/java/JavaPairDStream.scala    | 101 +--
 .../api/java/JavaStreamingContext.scala         |   4 +-
 .../apache/spark/streaming/JavaAPISuite.java    |  62 +-
 58 files changed, 2083 insertions(+), 688 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
new file mode 100644
index 0000000..57fd0a7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns zero or more records of type Double from each input record.
+ */
+public interface DoubleFlatMapFunction<T> extends Serializable {
+  public Iterable<Double> call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
new file mode 100644
index 0000000..150144e
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ *  A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
+public interface DoubleFunction<T> extends Serializable {
+  public double call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
new file mode 100644
index 0000000..fa75842
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns zero or more output records from each input record.
+ */
+public interface FlatMapFunction<T, R> extends Serializable {
+  public Iterable<R> call(T t) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
new file mode 100644
index 0000000..d1fdec0
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that takes two inputs and returns zero or more output records.
+ */
+public interface FlatMapFunction2<T1, T2, R> extends Serializable {
+  public Iterable<R> call(T1 t1, T2 t2) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java
new file mode 100644
index 0000000..d00551b
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for functions whose return types do not create special RDDs. PairFunction and
+ * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
+ * when mapping RDDs of other types.
+ */
+public interface Function<T1, R> extends Serializable {
+  public R call(T1 v1) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
new file mode 100644
index 0000000..793caaa
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A two-argument function that takes arguments of type T1 and T2 and returns an R.
+ */
+public interface Function2<T1, T2, R> extends Serializable {
+  public R call(T1 v1, T2 v2) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/Function3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
new file mode 100644
index 0000000..b4151c3
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
+ */
+public interface Function3<T1, T2, T3, R> extends Serializable {
+  public R call(T1 v1, T2 v2, T3 v3) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
new file mode 100644
index 0000000..691ef2e
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
+public interface PairFlatMapFunction<T, K, V> extends Serializable {
+  public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
new file mode 100644
index 0000000..abd9bcc
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
+public interface PairFunction<T, K, V> extends Serializable {
+  public Tuple2<K, V> call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
new file mode 100644
index 0000000..2a10435
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function with no return value.
+ */
+public interface VoidFunction<T> extends Serializable {
+  public void call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 0710444..d178706 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -83,7 +83,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
    * Return a new RDD containing only the elements that satisfy a predicate.
    */
   def filter(f: JFunction[JDouble, java.lang.Boolean]): JavaDoubleRDD =
-    fromRDD(srdd.filter(x => f(x).booleanValue()))
+    fromRDD(srdd.filter(x => f.call(x).booleanValue()))
 
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 3f67290..857626f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{HashPartitioner, Partitioner}
 import org.apache.spark.Partitioner._
 import org.apache.spark.SparkContext.rddToPairRDDFunctions
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
 import org.apache.spark.storage.StorageLevel
@@ -89,7 +89,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Return a new RDD containing only the elements that satisfy a predicate.
    */
   def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
+    new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue()))
 
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.
@@ -165,9 +165,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Simplified version of combineByKey that hash-partitions the output RDD.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
-    mergeValue: JFunction2[C, V, C],
-    mergeCombiners: JFunction2[C, C, C],
-    numPartitions: Int): JavaPairRDD[K, C] =
+      mergeValue: JFunction2[C, V, C],
+      mergeCombiners: JFunction2[C, C, C],
+      numPartitions: Int): JavaPairRDD[K, C] =
     combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
 
   /**
@@ -442,7 +442,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    */
   def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: V) => f.apply(x).asScala
+    def fn = (x: V) => f.call(x).asScala
     implicit val ctag: ClassTag[U] = fakeClassTag
     fromRDD(rdd.flatMapValues(fn))
   }
@@ -511,49 +511,49 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    conf: JobConf) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F],
+      conf: JobConf) {
     rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
   }
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F]) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F]) {
     rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
   }
 
   /** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
   def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    codec: Class[_ <: CompressionCodec]) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F],
+      codec: Class[_ <: CompressionCodec]) {
     rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
   }
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    conf: Configuration) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F],
+      conf: Configuration) {
     rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
   }
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F]) {
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[F]) {
     rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
   }
 
@@ -700,6 +700,15 @@ object JavaPairRDD {
 
   implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
 
+  private[spark]
+  implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
+    (x: T1, x1: T2) => fun.call(x, x1)
+  }
+
+  private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x)
+
+  private[spark]
+  implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y)
 
   /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
   def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index d7ce8fd..e973c46 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -70,7 +70,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
    * Return a new RDD containing only the elements that satisfy a predicate.
    */
   def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
-    wrapRDD(rdd.filter((x => f(x).booleanValue())))
+    wrapRDD(rdd.filter((x => f.call(x).booleanValue())))
 
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 729668f..af0114b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -67,7 +67,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to all elements of this RDD.
    */
   def map[R](f: JFunction[T, R]): JavaRDD[R] =
-    new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
+    new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)
 
   /**
    * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
@@ -82,15 +82,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
-  def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
-    new JavaDoubleRDD(rdd.map(x => f(x).doubleValue()))
+  def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
+    new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue()))
+  }
 
   /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
-  def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
-    new JavaPairRDD(rdd.map(f)(ctag))(f.keyType(), f.valueType())
+  def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
+    def cm = implicitly[ClassTag[(K2, V2)]]
+    new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
@@ -99,17 +100,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType())
+    def fn = (x: T) => f.call(x).asScala
+    JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
    *  Return a new RDD by first applying a function to all elements of this
    *  RDD, and then flattening the results.
    */
-  def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
+  def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
+    def fn = (x: T) => f.call(x).asScala
     new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
   }
 
@@ -117,19 +118,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    *  Return a new RDD by first applying a function to all elements of this
    *  RDD, and then flattening the results.
    */
-  def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
+  def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
     import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
-    JavaPairRDD.fromRDD(rdd.flatMap(fn)(ctag))(f.keyType(), f.valueType())
+    def fn = (x: T) => f.call(x).asScala
+    def cm = implicitly[ClassTag[(K2, V2)]]
+    JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
@@ -137,52 +138,53 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
       preservesPartitioning: Boolean): JavaRDD[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    JavaRDD.fromRDD(
+      rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
   }
 
   /**
-    * Return a new RDD by applying a function to each partition of this RDD.
+   * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+  def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
     new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
   }
 
   /**
-    * Return a new RDD by applying a function to each partition of this RDD.
+   * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
   JavaPairRDD[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
-
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]],
-    preservesPartitioning: Boolean): JavaDoubleRDD = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+  def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+      preservesPartitioning: Boolean): JavaDoubleRDD = {
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
     new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
-      .map((x: java.lang.Double) => x.doubleValue()))
+      .map(x => x.doubleValue()))
   }
 
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+  def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
       preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+    JavaPairRDD.fromRDD(
+      rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
   }
 
   /**
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
-    rdd.foreachPartition((x => f(asJavaIterator(x))))
+    rdd.foreachPartition((x => f.call(asJavaIterator(x))))
   }
 
   /**
@@ -205,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
     implicit val ctagK: ClassTag[K] = fakeClassTag
     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))
+    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
   }
 
   /**
@@ -215,7 +217,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
     implicit val ctagK: ClassTag[K] = fakeClassTag
     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))
+    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
   }
 
   /**
@@ -255,9 +257,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
       other: JavaRDDLike[U, _],
       f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
     def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
-      f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
+      f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
     JavaRDD.fromRDD(
-      rdd.zipPartitions(other.rdd)(fn)(other.classTag, f.elementType()))(f.elementType())
+      rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
   }
 
   // Actions (launch a job to return a value to the user program)
@@ -266,7 +268,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Applies a function f to all elements of this RDD.
    */
   def foreach(f: VoidFunction[T]) {
-    val cleanF = rdd.context.clean(f)
+    val cleanF = rdd.context.clean((x: T) => f.call(x))
     rdd.foreach(cleanF)
   }
 
@@ -320,7 +322,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
     combOp: JFunction2[U, U, U]): U =
-    rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType)
+    rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U])
 
   /**
    * Return the number of elements in the RDD.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
deleted file mode 100644
index 7500a89..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Double => JDouble, Iterable => JIterable}
-
-/**
- * A function that returns zero or more records of type Double from each input record.
- */
-// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
-abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]]
-   with Serializable {
-   // Intentionally left blank
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
deleted file mode 100644
index 2cdf2e9..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Double => JDouble}
-
-/**
- * A function that returns Doubles, and can be used to construct DoubleRDDs.
- */
-// DoubleFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and DoubleFunction.
-abstract class DoubleFunction[T] extends WrappedFunction1[T, JDouble] with Serializable {
-    // Intentionally left blank
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
deleted file mode 100644
index bdb01f7..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-
-/**
- * A function that returns zero or more output records from each input record.
- */
-abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
-  def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
deleted file mode 100644
index aae1349..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-
-/**
- * A function that takes two inputs and returns zero or more output records.
- */
-abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
-  def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
deleted file mode 100644
index a5e1701..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * Base class for functions whose return types do not create special RDDs. PairFunction and
- * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
- * when mapping RDDs of other types.
- */
-abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable {
-  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
deleted file mode 100644
index fa3616c..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * A two-argument function that takes arguments of type T1 and T2 and returns an R.
- */
-abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
-  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala b/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
deleted file mode 100644
index 4515289..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import org.apache.spark.api.java.JavaSparkContext
-import scala.reflect.ClassTag
-
-/**
- * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
- */
-abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
-  def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
deleted file mode 100644
index 8467bbb..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Iterable => JIterable}
-import org.apache.spark.api.java.JavaSparkContext
-import scala.reflect.ClassTag
-
-/**
- * A function that returns zero or more key-value pair records from each input record. The
- * key-value pairs are represented as scala.Tuple2 objects.
- */
-// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and PairFlatMapFunction.
-abstract class PairFlatMapFunction[T, K, V] extends WrappedFunction1[T, JIterable[(K, V)]]
-  with Serializable {
-
-  def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
-
-  def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
deleted file mode 100644
index d0ba0b6..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
- */
-// PairFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and PairFunction.
-abstract class PairFunction[T, K, V] extends WrappedFunction1[T, (K, V)] with Serializable {
-
-  def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
-
-  def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
deleted file mode 100644
index ea94313..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-/**
- * A function with no return value.
- */
-// This allows Java users to write void methods without having to return Unit.
-abstract class VoidFunction[T] extends Serializable {
-  @throws(classOf[Exception])
-  def call(t: T) : Unit
-}
-
-// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
-// return Unit), so it is implicitly converted to a Function1[T, Unit]:
-object VoidFunction {
-  implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
deleted file mode 100644
index cfe694f..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction1
-
-/**
- * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
-  @throws(classOf[Exception])
-  def call(t: T): R
-
-  final def apply(t: T): R = call(t)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
deleted file mode 100644
index eb9277c..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction2
-
-/**
- * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
-  @throws(classOf[Exception])
-  def call(t1: T1, t2: T2): R
-
-  final def apply(t1: T1, t2: T2): R = call(t1, t2)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
deleted file mode 100644
index d314dbd..0000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction3
-
-/**
- * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
-  extends AbstractFunction3[T1, T2, T3, R] {
-  @throws(classOf[Exception])
-  def call(t1: T1, t2: T2, t3: T3): R
-
-  final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3)
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index aa5079c..c7d0e2d 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -386,14 +386,14 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void map() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+    JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
       @Override
-      public Double call(Integer x) {
+      public double call(Integer x) {
         return 1.0 * x;
       }
     }).cache();
     doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(new PairFunction<Integer, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Integer x) {
         return new Tuple2<Integer, Integer>(x, x);
@@ -422,7 +422,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals("Hello", words.first());
     Assert.assertEquals(11, words.count());
 
-    JavaPairRDD<String, String> pairs = rdd.flatMap(
+    JavaPairRDD<String, String> pairs = rdd.flatMapToPair(
       new PairFlatMapFunction<String, String, String>() {
 
         @Override
@@ -436,7 +436,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
     Assert.assertEquals(11, pairs.count());
 
-    JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
+    JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
       @Override
       public Iterable<Double> call(String s) {
         List<Double> lengths = new LinkedList<Double>();
@@ -459,7 +459,7 @@ public class JavaAPISuite implements Serializable {
       JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
 
       // Regression test for SPARK-668:
-      JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
+      JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
           new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
           @Override
           public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
@@ -469,7 +469,7 @@ public class JavaAPISuite implements Serializable {
       swapped.collect();
 
       // There was never a bug here, but it's worth testing:
-      pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+      pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
           @Override
           public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
               return item.swap();
@@ -592,7 +592,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -601,7 +601,7 @@ public class JavaAPISuite implements Serializable {
 
     // Try reading the output back as an object file
     JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
-      Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
+      Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
       @Override
       public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
         return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
@@ -622,7 +622,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -653,7 +653,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -713,7 +713,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -743,7 +743,7 @@ public class JavaAPISuite implements Serializable {
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+    rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
       @Override
       public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
         return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -766,9 +766,9 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void zip() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+    JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
       @Override
-      public Double call(Integer x) {
+      public double call(Integer x) {
         return 1.0 * x;
       }
     });
@@ -893,13 +893,13 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void mapOnPairRDD() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Integer i) throws Exception {
         return new Tuple2<Integer, Integer>(i, i % 2);
       }
     });
-    JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+    JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
         new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
@@ -919,7 +919,7 @@ public class JavaAPISuite implements Serializable {
   public void collectPartitions() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
       @Override
       public Tuple2<Integer, Integer> call(Integer i) throws Exception {
         return new Tuple2<Integer, Integer>(i, i % 2);
@@ -984,7 +984,7 @@ public class JavaAPISuite implements Serializable {
   public void collectAsMapWithIntArrayValues() {
     // Regression test for SPARK-1040
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
-    JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(new PairFunction<Integer, Integer, int[]>() {
       @Override
       public Tuple2<Integer, int[]> call(Integer x) throws Exception {
         return new Tuple2<Integer, int[]>(x, new int[] { x });

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/dev/run-tests
----------------------------------------------------------------------
diff --git a/dev/run-tests b/dev/run-tests
index d65a397..cf0b940 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -27,6 +27,16 @@ rm -rf ./work
 # Fail fast
 set -e
 
+if test -x "$JAVA_HOME/bin/java"; then
+    declare java_cmd="$JAVA_HOME/bin/java"
+else 
+    declare java_cmd=java
+fi
+
+JAVA_VERSION=$($java_cmd -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
+[ "$JAVA_VERSION" -ge 18 ] && echo "" || echo "[Warn] Java 8 tests will not run, because JDK version is < 1.8."
+
+
 echo "========================================================================="
 echo "Running Scala style checks"
 echo "========================================================================="

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/docs/building-with-maven.md
----------------------------------------------------------------------
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index ded1292..a982c4d 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -25,6 +25,8 @@ If you don't run this, you may see errors like the following:
 
 You can fix this by setting the `MAVEN_OPTS` variable as discussed before.
 
+*Note: For Java 1.8 and above this step is not required.*
+
 ## Specifying the Hadoop version ##
 
 Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the "hadoop.version" property. If unset, Spark will build against Hadoop 1.0.4 by default.
@@ -76,3 +78,13 @@ The maven build includes support for building a Debian package containing the as
     $ mvn -Pdeb -DskipTests clean package
 
 The debian package can then be found under assembly/target. We added the short commit hash to the file name so that we can distinguish individual packages built for SNAPSHOT versions.
+
+## Running java 8 test suites.
+
+Running only java 8 tests and nothing else.
+
+    $ mvn install -DskipTests -Pjava8-tests
+    
+Java 8 tests are run when -Pjava8-tests profile is enabled, they will run in spite of -DskipTests. 
+For these tests to run your system must have a JDK 8 installation. 
+If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests.

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/docs/java-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md
index 5c73dbb..6632360 100644
--- a/docs/java-programming-guide.md
+++ b/docs/java-programming-guide.md
@@ -21,15 +21,21 @@ operations (e.g. map) and handling RDDs of different types, as discussed next.
 
 There are a few key differences between the Java and Scala APIs:
 
-* Java does not support anonymous or first-class functions, so functions must
-  be implemented by extending the
+* Java does not support anonymous or first-class functions, so functions are passed
+  using anonymous classes that implement the
   [`org.apache.spark.api.java.function.Function`](api/core/index.html#org.apache.spark.api.java.function.Function),
   [`Function2`](api/core/index.html#org.apache.spark.api.java.function.Function2), etc.
-  classes.
+  interfaces.
 * To maintain type safety, the Java API defines specialized Function and RDD
   classes for key-value pairs and doubles. For example, 
   [`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD)
   stores key-value pairs.
+* Some methods are defined on the basis of the passed anonymous function's 
+  (a.k.a lambda expression) return type, 
+  for example mapToPair(...) or flatMapToPair returns
+  [`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD),
+  similarly mapToDouble and flatMapToDouble returns
+  [`JavaDoubleRDD`](api/core/index.html#org.apache.spark.api.java.JavaDoubleRDD).
 * RDD methods like `collect()` and `countByKey()` return Java collections types,
   such as `java.util.List` and `java.util.Map`.
 * Key-value pairs, which are simply written as `(key, value)` in Scala, are represented
@@ -53,10 +59,10 @@ each specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`,
 etc (this acheives the "same-result-type" principle used by the [Scala collections
 framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html)).
 
-## Function Classes
+## Function Interfaces
 
-The following table lists the function classes used by the Java API.  Each
-class has a single abstract method, `call()`, that must be implemented.
+The following table lists the function interfaces used by the Java API.  Each
+interface has a single abstract method, `call()`, that must be implemented.
 
 <table class="table">
 <tr><th>Class</th><th>Function Type</th></tr>
@@ -78,7 +84,6 @@ RDD [storage level](scala-programming-guide.html#rdd-persistence) constants, suc
 declared in the [org.apache.spark.api.java.StorageLevels](api/core/index.html#org.apache.spark.api.java.StorageLevels) class. To
 define your own storage level, you can use StorageLevels.create(...). 
 
-
 # Other Features
 
 The Java API supports other Spark features, including
@@ -86,6 +91,21 @@ The Java API supports other Spark features, including
 [broadcast variables](scala-programming-guide.html#broadcast-variables), and
 [caching](scala-programming-guide.html#rdd-persistence).
 
+# Upgrading From Pre-1.0 Versions of Spark
+
+In version 1.0 of Spark the Java API was refactored to better support Java 8
+lambda expressions. Users upgrading from older versions of Spark should note
+the following changes:
+
+* All `org.apache.spark.api.java.function.*` have been changed from abstract
+  classes to interfaces. This means that concrete implementations of these 
+  `Function` classes will need to use `implements` rather than `extends`.
+* Certain transformation functions now have multiple versions depending
+  on the return type. In Spark core, the map functions (map, flatMap,
+  mapPartitons) have type-specific versions, e.g. 
+  [`mapToPair`](api/core/index.html#org.apache.spark.api.java.JavaRDD@mapToPair[K2,V2](f:org.apache.spark.api.java.function.PairFunction[T,K2,V2]):org.apache.spark.api.java.JavaPairRDD[K2,V2])
+  and [`mapToDouble`](api/core/index.html#org.apache.spark.api.java.JavaRDD@mapToDouble[R](f:org.apache.spark.api.java.function.DoubleFunction[T]):org.apache.spark.api.java.JavaDoubleRDD).
+  Spark Streaming also uses the same approach, e.g. [`transformToPair`](api/streaming/index.html#org.apache.spark.streaming.api.java.JavaDStream@transformToPair[K2,V2](transformFunc:org.apache.spark.api.java.function.Function[R,org.apache.spark.api.java.JavaPairRDD[K2,V2]]):org.apache.spark.streaming.api.java.JavaPairDStream[K2,V2]).
 
 # Example
 
@@ -127,11 +147,20 @@ class Split extends FlatMapFunction<String, String> {
 JavaRDD<String> words = lines.flatMap(new Split());
 {% endhighlight %}
 
+Java 8+ users can also write the above `FlatMapFunction` in a more concise way using 
+a lambda expression:
+
+{% highlight java %}
+JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(s.split(" ")));
+{% endhighlight %}
+
+This lambda syntax can be applied to all anonymous classes in Java 8.
+
 Continuing with the word count example, we map each word to a `(word, 1)` pair:
 
 {% highlight java %}
 import scala.Tuple2;
-JavaPairRDD<String, Integer> ones = words.map(
+JavaPairRDD<String, Integer> ones = words.mapToPair(
   new PairFunction<String, String, Integer>() {
     public Tuple2<String, Integer> call(String s) {
       return new Tuple2(s, 1);
@@ -140,7 +169,7 @@ JavaPairRDD<String, Integer> ones = words.map(
 );
 {% endhighlight %}
 
-Note that `map` was passed a `PairFunction<String, String, Integer>` and
+Note that `mapToPair` was passed a `PairFunction<String, String, Integer>` and
 returned a `JavaPairRDD<String, Integer>`.
 
 To finish the word count program, we will use `reduceByKey` to count the
@@ -164,7 +193,7 @@ possible to chain the RDD transformations, so the word count example could also
 be written as:
 
 {% highlight java %}
-JavaPairRDD<String, Integer> counts = lines.flatMap(
+JavaPairRDD<String, Integer> counts = lines.flatMapToPair(
     ...
   ).map(
     ...
@@ -180,10 +209,11 @@ just a matter of style.
 
 We currently provide documentation for the Java API as Scaladoc, in the
 [`org.apache.spark.api.java` package](api/core/index.html#org.apache.spark.api.java.package), because
-some of the classes are implemented in Scala. The main downside is that the types and function
+some of the classes are implemented in Scala. It is important to note that the types and function
 definitions show Scala syntax (for example, `def reduce(func: Function2[T, T]): T` instead of
-`T reduce(Function2<T, T> func)`). 
-We hope to generate documentation with Java-style syntax in the future.
+`T reduce(Function2<T, T> func)`). In addition, the Scala `trait` modifier is used for Java
+interface classes. We hope to generate documentation with Java-style syntax in the future to
+avoid these quirks.
 
 
 # Where to Go from Here

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index d552c47..6b49244 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -45,7 +45,7 @@ public final class JavaHdfsLR {
     double y;
   }
 
-  static class ParsePoint extends Function<String, DataPoint> {
+  static class ParsePoint implements Function<String, DataPoint> {
     private static final Pattern SPACE = Pattern.compile(" ");
 
     @Override
@@ -60,7 +60,7 @@ public final class JavaHdfsLR {
     }
   }
 
-  static class VectorSum extends Function2<double[], double[], double[]> {
+  static class VectorSum implements Function2<double[], double[], double[]> {
     @Override
     public double[] call(double[] a, double[] b) {
       double[] result = new double[D];
@@ -71,7 +71,7 @@ public final class JavaHdfsLR {
     }
   }
 
-  static class ComputeGradient extends Function<DataPoint, double[]> {
+  static class ComputeGradient implements Function<DataPoint, double[]> {
     private final double[] weights;
 
     ComputeGradient(double[] weights) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
index 0dc8792..2d79727 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
@@ -98,7 +98,7 @@ public final class JavaKMeans {
     double tempDist;
     do {
       // allocate each vector to closest centroid
-      JavaPairRDD<Integer, Vector> closest = data.map(
+      JavaPairRDD<Integer, Vector> closest = data.mapToPair(
         new PairFunction<Vector, Integer, Vector>() {
           @Override
           public Tuple2<Integer, Vector> call(Vector vector) {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 9eb1cad..a518fe2 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -108,7 +108,7 @@ public final class JavaLogQuery {
 
     JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
 
-    JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
+    JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
       @Override
       public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
         return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index a84245b..e53925b 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -42,7 +42,7 @@ import java.util.regex.Pattern;
 public final class JavaPageRank {
   private static final Pattern SPACES = Pattern.compile("\\s+");
 
-  private static class Sum extends Function2<Double, Double, Double> {
+  private static class Sum implements Function2<Double, Double, Double> {
     @Override
     public Double call(Double a, Double b) {
       return a + b;
@@ -66,7 +66,7 @@ public final class JavaPageRank {
     JavaRDD<String> lines = ctx.textFile(args[1], 1);
 
     // Loads all URLs from input file and initialize their neighbors.
-    JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
+    JavaPairRDD<String, List<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
       @Override
       public Tuple2<String, String> call(String s) {
         String[] parts = SPACES.split(s);
@@ -86,7 +86,7 @@ public final class JavaPageRank {
     for (int current = 0; current < Integer.parseInt(args[2]); current++) {
       // Calculates URL contributions to the rank of other URLs.
       JavaPairRDD<String, Double> contribs = links.join(ranks).values()
-        .flatMap(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
+        .flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
           @Override
           public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
             List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaTC.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index 2ceb0fd..6cfe25c 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -50,7 +50,7 @@ public final class JavaTC {
     return new ArrayList<Tuple2<Integer, Integer>>(edges);
   }
 
-  static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
+  static class ProjectFn implements PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
       Integer, Integer> {
     static final ProjectFn INSTANCE = new ProjectFn();
 
@@ -77,7 +77,7 @@ public final class JavaTC {
     // the graph to obtain the path (x, z).
 
     // Because join() joins on keys, the edges are stored in reversed order.
-    JavaPairRDD<Integer, Integer> edges = tc.map(
+    JavaPairRDD<Integer, Integer> edges = tc.mapToPair(
       new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
         @Override
         public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
@@ -91,7 +91,7 @@ public final class JavaTC {
       oldCount = nextCount;
       // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
       // then project the result to obtain the new (x, z) paths.
-      tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct().cache();
+      tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache();
       nextCount = tc.count();
     } while (nextCount != oldCount);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 6651f98..fa1b977 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -49,7 +49,7 @@ public final class JavaWordCount {
       }
     });
     
-    JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
+    JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
       @Override
       public Tuple2<String, Integer> call(String s) {
         return new Tuple2<String, Integer>(s, 1);