You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mohit Jaggi (JIRA)" <ji...@apache.org> on 2014/09/17 06:02:33 UTC
[jira] [Commented] (SPARK-3489) support rdd.zip(rdd1, rdd2,...)
with variable number of rdds as params
[ https://issues.apache.org/jira/browse/SPARK-3489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14136738#comment-14136738 ]
Mohit Jaggi commented on SPARK-3489:
------------------------------------
Proposed diff ---
MohitMacBook:spark mohit$ git diff
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a9b905b..2c9f034 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -711,6 +711,21 @@ abstract class RDD[T: ClassTag](
}
}
}
+
+ /**
+ * Zips this RDD with a sequence of other RDDs, returning key-value pairs with the first element in each RDD,
+ * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+ * partitions* and the *same number of elements in each partition* (e.g. one was made through
+ * a map on the other).
+ */
+ def zip(others: Seq[RDD[_]]): RDD[Array[Any]] = {
+ zipPartitions(others, preservesPartitioning = false) { iterSeq: Seq[Iterator[Any]] =>
+ new Iterator[Array[Any]] {
+ def hasNext = !iterSeq.exists(! _.hasNext)
+ def next = iterSeq.map { iter => iter.next }.toArray
+ }
+ }
+ }
/**
* Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
@@ -748,7 +763,11 @@ abstract class RDD[T: ClassTag](
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
-
+ def zipPartitions[V: ClassTag]
+ (others: Seq[RDD[_]], preservesPartitioning: Boolean)
+ (f: (Seq[Iterator[Any]]) => Iterator[V]): RDD[V] =
+ new ZippedPartitionsRDDn(sc, sc.clean(f), this +: others, false)
+
// Actions (launch a job to return a value to the user program)
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index f3d30f6..d22d7d3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -146,3 +146,22 @@ private[spark] class ZippedPartitionsRDD4
rdd4 = null
}
}
+
+private[spark] class ZippedPartitionsRDDn
+ [V: ClassTag](
+ sc: SparkContext,
+ f: (Seq[Iterator[_]] => Iterator[V]),
+ var rddSeq: Seq[RDD[_]],
+ preservesPartitioning: Boolean = false)
+ extends ZippedPartitionsBaseRDD[V](sc, rddSeq, preservesPartitioning) {
+
+ override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+ val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
+ f(rdds.zipWithIndex.map (rdd => rdd._1.iterator(partitions(rdd._2), context)))
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdds = null
+ }
+}
(END)
> support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params
> ----------------------------------------------------------------------
>
> Key: SPARK-3489
> URL: https://issues.apache.org/jira/browse/SPARK-3489
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 1.0.2
> Reporter: Mohit Jaggi
> Fix For: 1.0.3
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org