You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mohit Jaggi (JIRA)" <ji...@apache.org> on 2014/09/17 06:02:33 UTC

[jira] [Commented] (SPARK-3489) support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params

    [ https://issues.apache.org/jira/browse/SPARK-3489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14136738#comment-14136738 ] 

Mohit Jaggi commented on SPARK-3489:
------------------------------------

Proposed diff ---

MohitMacBook:spark mohit$ git diff
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a9b905b..2c9f034 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -711,6 +711,21 @@ abstract class RDD[T: ClassTag](
       }
     }
   }
+  
+  /**
+   * Zips this RDD with a sequence of other RDDs, returning key-value pairs with the first element in each RDD,
+   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+   * partitions* and the *same number of elements in each partition* (e.g. one was made through
+   * a map on the other).
+   */
+  def zip(others: Seq[RDD[_]]): RDD[Array[Any]] = {
+    zipPartitions(others, preservesPartitioning = false) { iterSeq: Seq[Iterator[Any]] =>
+        new Iterator[Array[Any]] {
+               def hasNext = !iterSeq.exists(! _.hasNext)
+               def next = iterSeq.map { iter => iter.next }.toArray
+      }
+    }
+  }
 
   /**
    * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
@@ -748,7 +763,11 @@ abstract class RDD[T: ClassTag](
       (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
     new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
 
-
+  def zipPartitions[V: ClassTag]
+      (others: Seq[RDD[_]], preservesPartitioning: Boolean)
+      (f: (Seq[Iterator[Any]]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDDn(sc, sc.clean(f), this +: others, false)
+  
   // Actions (launch a job to return a value to the user program)
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index f3d30f6..d22d7d3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -146,3 +146,22 @@ private[spark] class ZippedPartitionsRDD4
     rdd4 = null
   }
 }
+
+private[spark] class ZippedPartitionsRDDn
+  [V: ClassTag](
+    sc: SparkContext,
+    f: (Seq[Iterator[_]] => Iterator[V]),
+    var rddSeq: Seq[RDD[_]],
+    preservesPartitioning: Boolean = false)
+  extends ZippedPartitionsBaseRDD[V](sc, rddSeq, preservesPartitioning) {
+
+  override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+    val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
+    f(rdds.zipWithIndex.map (rdd => rdd._1.iterator(partitions(rdd._2), context)))
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdds = null
+  }
+}
(END) 

> support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params
> ----------------------------------------------------------------------
>
>                 Key: SPARK-3489
>                 URL: https://issues.apache.org/jira/browse/SPARK-3489
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.0.2
>            Reporter: Mohit Jaggi
>             Fix For: 1.0.3
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org