You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/01/23 20:14:30 UTC

[2/9] git commit: Added mapPartitions method to JavaRDD.

Added mapPartitions method to JavaRDD.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dbadc6b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dbadc6b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dbadc6b9

Branch: refs/heads/master
Commit: dbadc6b994ff54f86b726c71fa08837a6b1e7238
Parents: aae8a01
Author: eklavya <sr...@gmail.com>
Authored: Mon Jan 13 17:56:10 2014 +0530
Committer: eklavya <sr...@gmail.com>
Committed: Mon Jan 13 17:56:10 2014 +0530

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/java/JavaRDD.scala | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbadc6b9/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 6c91eda..568ae15 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -21,8 +21,10 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap}
 import org.apache.spark.storage.StorageLevel
+import java.util.{Iterator => JIterator}
+import scala.collection.JavaConversions._
 
 class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
 JavaRDDLike[T, JavaRDD[T]] {
@@ -138,6 +140,15 @@ JavaRDDLike[T, JavaRDD[T]] {
   def setGenerator(_generator: String) = {
     rdd.generator = _generator
   }
+
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD.
+   */
+  def mapPartitions[U: ClassTag](
+      f: JFMap[JIterator[T], U], preservesPartitioning: Boolean = false): JavaRDD[U] = {
+    rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning)
+  }
+
 }
 
 object JavaRDD {