You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:59:52 UTC

[28/50] git commit: Moved PartitionStrategy's into an object.

Moved PartitionStrategy's into an object.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1dce9ce4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1dce9ce4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1dce9ce4

Branch: refs/heads/master
Commit: 1dce9ce446dd248755cd65b7a6a0729a4dca2d62
Parents: ae06d2c
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Jan 13 18:32:04 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Jan 13 18:32:04 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/graphx/PartitionStrategy.scala | 158 ++++++++++---------
 .../org/apache/spark/graphx/lib/Analytics.scala |   2 +
 .../org/apache/spark/graphx/GraphSuite.scala    |   1 +
 .../spark/graphx/lib/TriangleCountSuite.scala   |   5 +-
 4 files changed, 85 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1dce9ce4/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index b9ccd87..6d2990a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -4,96 +4,100 @@ package org.apache.spark.graphx
  * Represents the way edges are assigned to edge partitions based on their source and destination
  * vertex IDs.
  */
-sealed trait PartitionStrategy extends Serializable {
+trait PartitionStrategy extends Serializable {
   /** Returns the partition number for a given edge. */
   def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
 }
 
-
 /**
- * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
- * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
- *
- * Suppose we have a graph with 11 vertices that we want to partition
- * over 9 machines.  We can use the following sparse matrix representation:
- *
- * <pre>
- *       __________________________________
- *  v0   | P0 *     | P1       | P2    *  |
- *  v1   |  ****    |  *       |          |
- *  v2   |  ******* |      **  |  ****    |
- *  v3   |  *****   |  *  *    |       *  |
- *       ----------------------------------
- *  v4   | P3 *     | P4 ***   | P5 **  * |
- *  v5   |  *  *    |  *       |          |
- *  v6   |       *  |      **  |  ****    |
- *  v7   |  * * *   |  *  *    |       *  |
- *       ----------------------------------
- *  v8   | P6   *   | P7    *  | P8  *   *|
- *  v9   |     *    |  *    *  |          |
- *  v10  |       *  |      **  |  *  *    |
- *  v11  | * <-E    |  ***     |       ** |
- *       ----------------------------------
- * </pre>
- *
- * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
- * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks.  Notice
- * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
- * row of blocks `(P6, P7, P8)`.  As a consequence we can guarantee that `v11` will need to be
- * replicated to at most `2 * sqrt(numParts)` machines.
- *
- * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
- * balance.  To improve balance we first multiply each vertex id by a large prime to shuffle the
- * vertex locations.
- *
- * One of the limitations of this approach is that the number of machines must either be a perfect
- * square. We partially address this limitation by computing the machine assignment to the next
- * largest perfect square and then mapping back down to the actual number of machines.
- * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
- * is used.
+ * Collection of built-in [[PartitionStrategy]] implementations.
  */
-case object EdgePartition2D extends PartitionStrategy {
-  override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
-    val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
-    val mixingPrime: VertexID = 1125899906842597L
-    val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
-    val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
-    (col * ceilSqrtNumParts + row) % numParts
+object PartitionStrategy {
+  /**
+   * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
+   * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
+   *
+   * Suppose we have a graph with 11 vertices that we want to partition
+   * over 9 machines.  We can use the following sparse matrix representation:
+   *
+   * <pre>
+   *       __________________________________
+   *  v0   | P0 *     | P1       | P2    *  |
+   *  v1   |  ****    |  *       |          |
+   *  v2   |  ******* |      **  |  ****    |
+   *  v3   |  *****   |  *  *    |       *  |
+   *       ----------------------------------
+   *  v4   | P3 *     | P4 ***   | P5 **  * |
+   *  v5   |  *  *    |  *       |          |
+   *  v6   |       *  |      **  |  ****    |
+   *  v7   |  * * *   |  *  *    |       *  |
+   *       ----------------------------------
+   *  v8   | P6   *   | P7    *  | P8  *   *|
+   *  v9   |     *    |  *    *  |          |
+   *  v10  |       *  |      **  |  *  *    |
+   *  v11  | * <-E    |  ***     |       ** |
+   *       ----------------------------------
+   * </pre>
+   *
+   * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
+   * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks.  Notice
+   * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
+   * row of blocks `(P6, P7, P8)`.  As a consequence we can guarantee that `v11` will need to be
+   * replicated to at most `2 * sqrt(numParts)` machines.
+   *
+   * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
+   * balance.  To improve balance we first multiply each vertex id by a large prime to shuffle the
+   * vertex locations.
+   *
+   * One of the limitations of this approach is that the number of machines must either be a perfect
+   * square. We partially address this limitation by computing the machine assignment to the next
+   * largest perfect square and then mapping back down to the actual number of machines.
+   * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
+   * is used.
+   */
+  case object EdgePartition2D extends PartitionStrategy {
+    override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+      val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
+      val mixingPrime: VertexID = 1125899906842597L
+      val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+      val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+      (col * ceilSqrtNumParts + row) % numParts
+    }
   }
-}
 
-/**
- * Assigns edges to partitions using only the source vertex ID, colocating edges with the same
- * source.
- */
-case object EdgePartition1D extends PartitionStrategy {
-  override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
-    val mixingPrime: VertexID = 1125899906842597L
-    (math.abs(src) * mixingPrime).toInt % numParts
+  /**
+   * Assigns edges to partitions using only the source vertex ID, colocating edges with the same
+   * source.
+   */
+  case object EdgePartition1D extends PartitionStrategy {
+    override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+      val mixingPrime: VertexID = 1125899906842597L
+      (math.abs(src) * mixingPrime).toInt % numParts
+    }
   }
-}
 
 
-/**
- * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
- * random vertex cut that colocates all same-direction edges between two vertices.
- */
-case object RandomVertexCut extends PartitionStrategy {
-  override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
-    math.abs((src, dst).hashCode()) % numParts
+  /**
+   * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
+   * random vertex cut that colocates all same-direction edges between two vertices.
+   */
+  case object RandomVertexCut extends PartitionStrategy {
+    override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+      math.abs((src, dst).hashCode()) % numParts
+    }
   }
-}
 
 
-/**
- * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical
- * direction, resulting in a random vertex cut that colocates all edges between two vertices,
- * regardless of direction.
- */
-case object CanonicalRandomVertexCut extends PartitionStrategy {
-  override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
-    val lower = math.min(src, dst)
-    val higher = math.max(src, dst)
-    math.abs((lower, higher).hashCode()) % numParts
+  /**
+   * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical
+   * direction, resulting in a random vertex cut that colocates all edges between two vertices,
+   * regardless of direction.
+   */
+  case object CanonicalRandomVertexCut extends PartitionStrategy {
+    override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+      val lower = math.min(src, dst)
+      val higher = math.max(src, dst)
+      math.abs((lower, higher).hashCode()) % numParts
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1dce9ce4/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index d5e1de1..e0aff56 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -2,6 +2,7 @@ package org.apache.spark.graphx.lib
 
 import org.apache.spark._
 import org.apache.spark.graphx._
+import org.apache.spark.graphx.PartitionStrategy._
 
 /**
  * Driver program for running graph algorithms.
@@ -20,6 +21,7 @@ object Analytics extends Logging {
     }
 
     def pickPartitioner(v: String): PartitionStrategy = {
+      // TODO: Use reflection rather than listing all the partitioning strategies here.
       v match {
         case "RandomVertexCut" => RandomVertexCut
         case "EdgePartition1D" => EdgePartition1D

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1dce9ce4/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index c32a6cb..9587f04 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -4,6 +4,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
+import org.apache.spark.graphx.PartitionStrategy._
 import org.apache.spark.rdd._
 
 class GraphSuite extends FunSuite with LocalSparkContext {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1dce9ce4/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
index a286b7d..3452ce9 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
@@ -2,11 +2,8 @@ package org.apache.spark.graphx.lib
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.GraphGenerators
-import org.apache.spark.rdd._
+import org.apache.spark.graphx.PartitionStrategy.RandomVertexCut
 
 
 class TriangleCountSuite extends FunSuite with LocalSparkContext {