You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/23 12:46:59 UTC

[1/2] git commit: Support preservesPartitioning in RDD.zipPartitions

Updated Branches:
  refs/heads/master 086b097e3 -> 51aa9d6e9


Support preservesPartitioning in RDD.zipPartitions


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c1507afc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c1507afc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c1507afc

Branch: refs/heads/master
Commit: c1507afc6ca161608a83967cdebe1404051658d3
Parents: 086b097
Author: Ankur Dave <an...@gmail.com>
Authored: Sat Nov 23 02:32:37 2013 -0800
Committer: Ankur Dave <an...@gmail.com>
Committed: Sat Nov 23 03:03:31 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 21 +++++++++++++++++---
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  | 21 +++++++++++++-------
 2 files changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c1507afc/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6e88be6..7623c44 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -546,19 +546,34 @@ abstract class RDD[T: ClassManifest](
    * of elements in each partition.
    */
   def zipPartitions[B: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], preservesPartitioning: Boolean)
+      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
+
+  def zipPartitions[B: ClassManifest, V: ClassManifest]
       (rdd2: RDD[B])
       (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
+    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
+
+  def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
+      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
 
   def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
       (rdd2: RDD[B], rdd3: RDD[C])
       (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
+    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
+
+  def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
+      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
 
   def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
       (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
       (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
+    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
 
 
   // Actions (launch a job to return a value to the user program)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c1507afc/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 31e6fd5..faeb316 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -39,9 +39,13 @@ private[spark] class ZippedPartitionsPartition(
 
 abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
     sc: SparkContext,
-    var rdds: Seq[RDD[_]])
+    var rdds: Seq[RDD[_]],
+    preservesPartitioning: Boolean = false)
   extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
 
+  override val partitioner =
+    if (preservesPartitioning) firstParent[Any].partitioner else None
+
   override def getPartitions: Array[Partition] = {
     val sizes = rdds.map(x => x.partitions.size)
     if (!sizes.forall(x => x == sizes(0))) {
@@ -76,8 +80,9 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]
     sc: SparkContext,
     f: (Iterator[A], Iterator[B]) => Iterator[V],
     var rdd1: RDD[A],
-    var rdd2: RDD[B])
-  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
+    var rdd2: RDD[B],
+    preservesPartitioning: Boolean = false)
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
@@ -97,8 +102,9 @@ class ZippedPartitionsRDD3
     f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
     var rdd1: RDD[A],
     var rdd2: RDD[B],
-    var rdd3: RDD[C])
-  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) {
+    var rdd3: RDD[C],
+    preservesPartitioning: Boolean = false)
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
@@ -122,8 +128,9 @@ class ZippedPartitionsRDD4
     var rdd1: RDD[A],
     var rdd2: RDD[B],
     var rdd3: RDD[C],
-    var rdd4: RDD[D])
-  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) {
+    var rdd4: RDD[D],
+    preservesPartitioning: Boolean = false)
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions


[2/2] git commit: Merge pull request #198 from ankurdave/zipPartitions-preservesPartitioning

Posted by rx...@apache.org.
Merge pull request #198 from ankurdave/zipPartitions-preservesPartitioning

Support preservesPartitioning in RDD.zipPartitions

In `RDD.zipPartitions`, add support for a `preservesPartitioning` option (similar to `RDD.mapPartitions`) that reuses the first RDD's partitioner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/51aa9d6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/51aa9d6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/51aa9d6e

Branch: refs/heads/master
Commit: 51aa9d6e996895e15a6eb539b7049e6fe8a335e5
Parents: 086b097 c1507af
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Nov 23 19:46:46 2013 +0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Nov 23 19:46:46 2013 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 21 +++++++++++++++++---
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  | 21 +++++++++++++-------
 2 files changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------