You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/09/26 22:49:17 UTC

[2/7] git commit: Make mapPartitionsWithIndex work with JavaRDD's

Make mapPartitionsWithIndex work with JavaRDD's


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bfcddf47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bfcddf47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bfcddf47

Branch: refs/heads/master
Commit: bfcddf4700023f53d5eed92ef8ef75c072af3ced
Parents: 74f710f
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Sat Sep 14 15:53:42 2013 -0700
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Sat Sep 14 15:53:42 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bfcddf47/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 9ad175e..264c4bc 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -71,9 +71,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
    * of the original partition.
    */
-  def mapPartitionsWithIndex(f: JFunction2[Int, T, R],
+  def mapPartitionsWithIndex[R: ClassManifest](f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
 			     preservesPartitioning: Boolean = false): JavaRDD[R] =
-    new JavaRDD(MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning))
+    new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
+					   preservesPartitioning))
 
   /**
    * Return a new RDD by applying a function to all elements of this RDD.