You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/01/23 20:14:35 UTC

[7/9] git commit: Modifications as suggested in PR feedback-

Modifications as suggested in PR feedback-

- more variants of mapPartitions added to JavaRDDLike
- move setGenerator to JavaRDDLike
- clean up


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1442cd5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1442cd5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1442cd5d

Branch: refs/heads/master
Commit: 1442cd5d5099de71747b1cccf463b94fdedcda1f
Parents: e922973
Author: Saurabh Rawat <sr...@gmail.com>
Authored: Tue Jan 14 14:19:02 2014 +0530
Committer: Saurabh Rawat <sr...@gmail.com>
Committed: Tue Jan 14 14:19:02 2014 +0530

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDD.scala     |  9 +-------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 22 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1442cd5d/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index e687bbd..7d48ce0 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -21,10 +21,8 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap, VoidFunction}
+import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.storage.StorageLevel
-import java.util.{Iterator => JIterator}
-import scala.collection.JavaConversions._
 
 class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
 JavaRDDLike[T, JavaRDD[T]] {
@@ -135,11 +133,6 @@ JavaRDDLike[T, JavaRDD[T]] {
     rdd.setName(name)
     this
   }
-
-  /** Reset generator*/
-  def setGenerator(_generator: String) = {
-    rdd.setGenerator(_generator)
-  }
 }
 
 object JavaRDD {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1442cd5d/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index eb8e34e..808c907 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -157,6 +157,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   }
 
   /**
+   * Return a new RDD by applying a function to each partition of this RDD.
+   */
+  def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
+    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+    new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
+  }
+
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD.
+   */
+  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
+  JavaPairRDD[K2, V2] = {
+    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+    JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+  }
+
+  /**
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
@@ -476,4 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
 
   def name(): String = rdd.name
+
+  /** Reset generator */
+  def setGenerator(_generator: String) = {
+    rdd.setGenerator(_generator)
+  }
 }