You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/01/23 20:14:35 UTC
[7/9] git commit: Modifications as suggested in PR feedback-
Modifications as suggested in PR feedback-
- more variants of mapPartitions added to JavaRDDLike
- move setGenerator to JavaRDDLike
- clean up
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1442cd5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1442cd5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1442cd5d
Branch: refs/heads/master
Commit: 1442cd5d5099de71747b1cccf463b94fdedcda1f
Parents: e922973
Author: Saurabh Rawat <sr...@gmail.com>
Authored: Tue Jan 14 14:19:02 2014 +0530
Committer: Saurabh Rawat <sr...@gmail.com>
Committed: Tue Jan 14 14:19:02 2014 +0530
----------------------------------------------------------------------
.../org/apache/spark/api/java/JavaRDD.scala | 9 +-------
.../org/apache/spark/api/java/JavaRDDLike.scala | 22 ++++++++++++++++++++
2 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1442cd5d/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index e687bbd..7d48ce0 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -21,10 +21,8 @@ import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap, VoidFunction}
+import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
-import java.util.{Iterator => JIterator}
-import scala.collection.JavaConversions._
class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
JavaRDDLike[T, JavaRDD[T]] {
@@ -135,11 +133,6 @@ JavaRDDLike[T, JavaRDD[T]] {
rdd.setName(name)
this
}
-
- /** Reset generator*/
- def setGenerator(_generator: String) = {
- rdd.setGenerator(_generator)
- }
}
object JavaRDD {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1442cd5d/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index eb8e34e..808c907 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -157,6 +157,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
+ }
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
+ JavaPairRDD[K2, V2] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+ }
+
+ /**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
@@ -476,4 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
def name(): String = rdd.name
+
+ /** Reset generator */
+ def setGenerator(_generator: String) = {
+ rdd.setGenerator(_generator)
+ }
}