You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/06/04 05:51:27 UTC

git commit: Enable repartitioning of graph over different number of partitions

Repository: spark
Updated Branches:
  refs/heads/master e8d93ee52 -> 5284ca78d


Enable repartitioning of graph over different number of partitions

It is currently very difficult to repartition a graph over a different number of partitions.  This PR adds an additional `partitionBy` function that takes the number of partitions.

Author: Joseph E. Gonzalez <jo...@gmail.com>

Closes #719 from jegonzal/graph_partitioning_options and squashes the following commits:

730b405 [Joseph E. Gonzalez] adding an additional number of partitions option to partitionBy


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5284ca78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5284ca78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5284ca78

Branch: refs/heads/master
Commit: 5284ca78d17fb4de9a7019f3bbecf86484c13763
Parents: e8d93ee
Author: Joseph E. Gonzalez <jo...@gmail.com>
Authored: Tue Jun 3 20:49:14 2014 -0700
Committer: Ankur Dave <an...@gmail.com>
Committed: Tue Jun 3 20:49:14 2014 -0700

----------------------------------------------------------------------
 graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 10 ++++++++++
 .../scala/org/apache/spark/graphx/PartitionStrategy.scala |  8 +++++---
 .../scala/org/apache/spark/graphx/impl/GraphImpl.scala    |  6 +++++-
 3 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index c4f9d65..14ae50e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -106,10 +106,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
 
   /**
    * Repartitions the edges in the graph according to `partitionStrategy`.
+   *
+   * @param the partitioning strategy to use when partitioning the edges in the graph.
    */
   def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
 
   /**
+   * Repartitions the edges in the graph according to `partitionStrategy`.
+   *
+   * @param the partitioning strategy to use when partitioning the edges in the graph.
+   * @param numPartitions the number of edge partitions in the new graph.
+   */
+  def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
+
+  /**
    * Transforms each vertex attribute in the graph using the map function.
    *
    * @note The new graph has the same structure.  As a consequence the underlying index structures

http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index ef412cf..5e7e72a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -114,9 +114,11 @@ object PartitionStrategy {
    */
   case object CanonicalRandomVertexCut extends PartitionStrategy {
     override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
-      val lower = math.min(src, dst)
-      val higher = math.max(src, dst)
-      math.abs((lower, higher).hashCode()) % numParts
+      if (src < dst) {
+        math.abs((src, dst).hashCode()) % numParts
+      } else {
+        math.abs((dst, src).hashCode()) % numParts
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 59d9a88..15ea05c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -74,7 +74,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   }
 
   override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
-    val numPartitions = edges.partitions.size
+    partitionBy(partitionStrategy, edges.partitions.size)
+  }
+
+  override def partitionBy(
+      partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
     val edTag = classTag[ED]
     val vdTag = classTag[VD]
     val newEdges = edges.withPartitionsRDD(edges.map { e =>