You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/06/04 05:51:27 UTC
git commit: Enable repartitioning of graph over different number of
partitions
Repository: spark
Updated Branches:
refs/heads/master e8d93ee52 -> 5284ca78d
Enable repartitioning of graph over different number of partitions
It is currently very difficult to repartition a graph over a different number of partitions. This PR adds an additional `partitionBy` function that takes the number of partitions.
Author: Joseph E. Gonzalez <jo...@gmail.com>
Closes #719 from jegonzal/graph_partitioning_options and squashes the following commits:
730b405 [Joseph E. Gonzalez] adding an additional number of partitions option to partitionBy
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5284ca78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5284ca78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5284ca78
Branch: refs/heads/master
Commit: 5284ca78d17fb4de9a7019f3bbecf86484c13763
Parents: e8d93ee
Author: Joseph E. Gonzalez <jo...@gmail.com>
Authored: Tue Jun 3 20:49:14 2014 -0700
Committer: Ankur Dave <an...@gmail.com>
Committed: Tue Jun 3 20:49:14 2014 -0700
----------------------------------------------------------------------
graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 10 ++++++++++
.../scala/org/apache/spark/graphx/PartitionStrategy.scala | 8 +++++---
.../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 6 +++++-
3 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index c4f9d65..14ae50e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -106,10 +106,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
/**
* Repartitions the edges in the graph according to `partitionStrategy`.
+ *
+ * @param the partitioning strategy to use when partitioning the edges in the graph.
*/
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
/**
+ * Repartitions the edges in the graph according to `partitionStrategy`.
+ *
+ * @param the partitioning strategy to use when partitioning the edges in the graph.
+ * @param numPartitions the number of edge partitions in the new graph.
+ */
+ def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
+
+ /**
* Transforms each vertex attribute in the graph using the map function.
*
* @note The new graph has the same structure. As a consequence the underlying index structures
http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index ef412cf..5e7e72a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -114,9 +114,11 @@ object PartitionStrategy {
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
- val lower = math.min(src, dst)
- val higher = math.max(src, dst)
- math.abs((lower, higher).hashCode()) % numParts
+ if (src < dst) {
+ math.abs((src, dst).hashCode()) % numParts
+ } else {
+ math.abs((dst, src).hashCode()) % numParts
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 59d9a88..15ea05c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -74,7 +74,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
- val numPartitions = edges.partitions.size
+ partitionBy(partitionStrategy, edges.partitions.size)
+ }
+
+ override def partitionBy(
+ partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
val edTag = classTag[ED]
val vdTag = classTag[VD]
val newEdges = edges.withPartitionsRDD(edges.map { e =>