You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/06 20:01:47 UTC

[1/2] git commit: Memoize preferred locations in ZippedPartitionsBaseRDD so preferred location computation doesn't lead to exponential explosion.

Updated Branches:
  refs/heads/master 078049877 -> 87676a6af


Memoize preferred locations in ZippedPartitionsBaseRDD so preferred location computation doesn't lead to exponential explosion.

(cherry picked from commit e36fe55a031d2c01c9d7c5d85965951c681a0c74)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9cf7f31e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9cf7f31e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9cf7f31e

Branch: refs/heads/master
Commit: 9cf7f31e4d4e542b88b6a474bdf08d07fdd3652c
Parents: 743a31a
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Nov 30 18:07:36 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Nov 30 18:10:52 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  | 27 ++++++++------------
 1 file changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9cf7f31e/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index faeb316..a97d2a0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -22,7 +22,8 @@ import java.io.{ObjectOutputStream, IOException}
 
 private[spark] class ZippedPartitionsPartition(
     idx: Int,
-    @transient rdds: Seq[RDD[_]])
+    @transient rdds: Seq[RDD[_]],
+    @transient val preferredLocations: Seq[String])
   extends Partition {
 
   override val index: Int = idx
@@ -47,27 +48,21 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
     if (preservesPartitioning) firstParent[Any].partitioner else None
 
   override def getPartitions: Array[Partition] = {
-    val sizes = rdds.map(x => x.partitions.size)
-    if (!sizes.forall(x => x == sizes(0))) {
+    val numParts = rdds.head.partitions.size
+    if (!rdds.forall(rdd => rdd.partitions.size == numParts)) {
       throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
     }
-    val array = new Array[Partition](sizes(0))
-    for (i <- 0 until sizes(0)) {
-      array(i) = new ZippedPartitionsPartition(i, rdds)
+    Array.tabulate[Partition](numParts) { i =>
+      val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i)))
+      // Check whether there are any hosts that match all RDDs; otherwise return the union
+      val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
+      val locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct
+      new ZippedPartitionsPartition(i, rdds, locs)
     }
-    array
   }
 
   override def getPreferredLocations(s: Partition): Seq[String] = {
-    val parts = s.asInstanceOf[ZippedPartitionsPartition].partitions
-    val prefs = rdds.zip(parts).map { case (rdd, p) => rdd.preferredLocations(p) }
-    // Check whether there are any hosts that match all RDDs; otherwise return the union
-    val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
-    if (!exactMatchLocations.isEmpty) {
-      exactMatchLocations
-    } else {
-      prefs.flatten.distinct
-    }
+    s.asInstanceOf[ZippedPartitionsPartition].preferredLocations
   }
 
   override def clearDependencies() {


[2/2] git commit: Merge pull request #220 from rxin/zippart

Posted by ma...@apache.org.
Merge pull request #220 from rxin/zippart

Memoize preferred locations in ZippedPartitionsBaseRDD

so preferred location computation doesn't lead to exponential explosion.

This was a problem in GraphX where we have a whole chain of RDDs that are ZippedPartitionsRDD's, and the preferred locations were taking eternity to compute.

(cherry picked from commit e36fe55a031d2c01c9d7c5d85965951c681a0c74)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/87676a6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/87676a6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/87676a6a

Branch: refs/heads/master
Commit: 87676a6af2c8fc33c5b5d4e7eb45e3e8558f3c33
Parents: 0780498 9cf7f31
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Fri Dec 6 11:01:42 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Fri Dec 6 11:01:42 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  | 27 ++++++++------------
 1 file changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------