You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/23 12:46:59 UTC
[1/2] git commit: Support preservesPartitioning in RDD.zipPartitions
Updated Branches:
refs/heads/master 086b097e3 -> 51aa9d6e9
Support preservesPartitioning in RDD.zipPartitions
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c1507afc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c1507afc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c1507afc
Branch: refs/heads/master
Commit: c1507afc6ca161608a83967cdebe1404051658d3
Parents: 086b097
Author: Ankur Dave <an...@gmail.com>
Authored: Sat Nov 23 02:32:37 2013 -0800
Committer: Ankur Dave <an...@gmail.com>
Committed: Sat Nov 23 03:03:31 2013 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/RDD.scala | 21 +++++++++++++++++---
.../apache/spark/rdd/ZippedPartitionsRDD.scala | 21 +++++++++++++-------
2 files changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c1507afc/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6e88be6..7623c44 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -546,19 +546,34 @@ abstract class RDD[T: ClassManifest](
* of elements in each partition.
*/
def zipPartitions[B: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], preservesPartitioning: Boolean)
+ (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
+ new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
+
+ def zipPartitions[B: ClassManifest, V: ClassManifest]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
+ new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
+
+ def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
+ (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
+ new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
+ new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
+
+ def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
+ (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
+ new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
+ new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
// Actions (launch a job to return a value to the user program)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c1507afc/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 31e6fd5..faeb316 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -39,9 +39,13 @@ private[spark] class ZippedPartitionsPartition(
abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
sc: SparkContext,
- var rdds: Seq[RDD[_]])
+ var rdds: Seq[RDD[_]],
+ preservesPartitioning: Boolean = false)
extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
+ override val partitioner =
+ if (preservesPartitioning) firstParent[Any].partitioner else None
+
override def getPartitions: Array[Partition] = {
val sizes = rdds.map(x => x.partitions.size)
if (!sizes.forall(x => x == sizes(0))) {
@@ -76,8 +80,9 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]
sc: SparkContext,
f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
- var rdd2: RDD[B])
- extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
+ var rdd2: RDD[B],
+ preservesPartitioning: Boolean = false)
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
@@ -97,8 +102,9 @@ class ZippedPartitionsRDD3
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
var rdd2: RDD[B],
- var rdd3: RDD[C])
- extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) {
+ var rdd3: RDD[C],
+ preservesPartitioning: Boolean = false)
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
@@ -122,8 +128,9 @@ class ZippedPartitionsRDD4
var rdd1: RDD[A],
var rdd2: RDD[B],
var rdd3: RDD[C],
- var rdd4: RDD[D])
- extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) {
+ var rdd4: RDD[D],
+ preservesPartitioning: Boolean = false)
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
[2/2] git commit: Merge pull request #198 from
ankurdave/zipPartitions-preservesPartitioning
Posted by rx...@apache.org.
Merge pull request #198 from ankurdave/zipPartitions-preservesPartitioning
Support preservesPartitioning in RDD.zipPartitions
In `RDD.zipPartitions`, add support for a `preservesPartitioning` option (similar to `RDD.mapPartitions`) that reuses the first RDD's partitioner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/51aa9d6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/51aa9d6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/51aa9d6e
Branch: refs/heads/master
Commit: 51aa9d6e996895e15a6eb539b7049e6fe8a335e5
Parents: 086b097 c1507af
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Nov 23 19:46:46 2013 +0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Nov 23 19:46:46 2013 +0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/RDD.scala | 21 +++++++++++++++++---
.../apache/spark/rdd/ZippedPartitionsRDD.scala | 21 +++++++++++++-------
2 files changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------