You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/07/14 22:15:07 UTC

spark git commit: [SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions

Repository: spark
Updated Branches:
  refs/heads/master d267c2834 -> 0a4071eab


[SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions

See https://github.com/aray/e2d/blob/master/EdgePartition2D.ipynb

Author: Andrew Ray <ra...@gmail.com>

Closes #7104 from aray/edge-partition-2d-improvement and squashes the following commits:

3729f84 [Andrew Ray] correct bounds and remove unneeded comments
97f8464 [Andrew Ray] change less
5141ab4 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement
925fd2c [Andrew Ray] use new interface for partitioning
001bfd0 [Andrew Ray] Refactor PartitionStrategy so that we can return a prtition function for a given number of parts. To keep compatibility we define default methods that translate between the two implementation options. Made EdgePartition2D use old strategy when we have a perfect square and implement new interface.
5d42105 [Andrew Ray] % -> /
3560084 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement
f006364 [Andrew Ray] remove unneeded comments
cfa2c5e [Andrew Ray] Modifications to EdgePartition2D so that it works for non perfect squares.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a4071ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a4071ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a4071ea

Branch: refs/heads/master
Commit: 0a4071eab30db1db80f61ed2cb2e7243291183ce
Parents: d267c28
Author: Andrew Ray <ra...@gmail.com>
Authored: Tue Jul 14 13:14:47 2015 -0700
Committer: Ankur Dave <an...@gmail.com>
Committed: Tue Jul 14 13:14:47 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/graphx/PartitionStrategy.scala | 32 +++++++++++++-------
 1 file changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a4071ea/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 7372dfb..70a7592 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -32,7 +32,7 @@ trait PartitionStrategy extends Serializable {
 object PartitionStrategy {
   /**
    * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
-   * guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication.
+   * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
    *
    * Suppose we have a graph with 12 vertices that we want to partition
    * over 9 machines.  We can use the following sparse matrix representation:
@@ -61,26 +61,36 @@ object PartitionStrategy {
    * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3,
    * P6)` or the last
    * row of blocks `(P6, P7, P8)`.  As a consequence we can guarantee that `v11` will need to be
-   * replicated to at most `2 * sqrt(numParts) - 1` machines.
+   * replicated to at most `2 * sqrt(numParts)` machines.
    *
    * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
    * balance.  To improve balance we first multiply each vertex id by a large prime to shuffle the
    * vertex locations.
    *
-   * One of the limitations of this approach is that the number of machines must either be a
-   * perfect square. We partially address this limitation by computing the machine assignment to
-   * the next
-   * largest perfect square and then mapping back down to the actual number of machines.
-   * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect
-   * square is used.
+   * When the number of partitions requested is not a perfect square we use a slightly different
+   * method where the last column can have a different number of rows than the others while still
+   * maintaining the same size per block.
    */
   case object EdgePartition2D extends PartitionStrategy {
     override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
       val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
       val mixingPrime: VertexId = 1125899906842597L
-      val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
-      val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
-      (col * ceilSqrtNumParts + row) % numParts
+      if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
+        // Use old method for perfect squared to ensure we get same results
+        val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
+        val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
+        (col * ceilSqrtNumParts + row) % numParts
+
+      } else {
+        // Otherwise use new method
+        val cols = ceilSqrtNumParts
+        val rows = (numParts + cols - 1) / cols
+        val lastColRows = numParts - rows * (cols - 1)
+        val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
+        val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
+        col * rows + row
+
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org