You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/20 21:24:17 UTC

spark git commit: [SPARK-13386][GRAPHX] ConnectedComponents should support maxIteration option

Repository: spark
Updated Branches:
  refs/heads/master 9ca79c1ec -> 6ce7c481d


[SPARK-13386][GRAPHX] ConnectedComponents should support maxIteration option

JIRA: https://issues.apache.org/jira/browse/SPARK-13386

## What changes were proposed in this pull request?

add maxIteration option for ConnectedComponents algorithm

## How was the this patch tested?

unit tests passed

Author: Zheng RuiFeng <ru...@foxmail.com>

Closes #11268 from zhengruifeng/ccwithmax.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ce7c481
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ce7c481
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ce7c481

Branch: refs/heads/master
Commit: 6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc
Parents: 9ca79c1
Author: Zheng RuiFeng <ru...@foxmail.com>
Authored: Sat Feb 20 12:24:10 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Feb 20 12:24:10 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/graphx/GraphOps.scala      | 14 ++++++++++--
 .../spark/graphx/lib/ConnectedComponents.scala  | 24 ++++++++++++++++----
 2 files changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ce7c481/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index d048fb5..97a8223 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -406,13 +406,23 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
   }
 
   /**
+    * Compute the connected component membership of each vertex and return a graph with the vertex
+    * value containing the lowest vertex id in the connected component containing that vertex.
+    *
+    * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
+    */
+  def connectedComponents(): Graph[VertexId, ED] = {
+    ConnectedComponents.run(graph)
+  }
+
+  /**
    * Compute the connected component membership of each vertex and return a graph with the vertex
    * value containing the lowest vertex id in the connected component containing that vertex.
    *
    * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
    */
-  def connectedComponents(): Graph[VertexId, ED] = {
-    ConnectedComponents.run(graph)
+  def connectedComponents(maxIterations: Int): Graph[VertexId, ED] = {
+    ConnectedComponents.run(graph, maxIterations)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce7c481/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index f72cbb1..40cf073 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -29,13 +29,14 @@ object ConnectedComponents {
    *
    * @tparam VD the vertex attribute type (discarded in the computation)
    * @tparam ED the edge attribute type (preserved in the computation)
-   *
    * @param graph the graph for which to compute the connected components
-   *
+   * @param maxIterations the maximum number of iterations to run for
    * @return a graph with vertex attributes containing the smallest vertex in each
    *         connected component
    */
-  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
+  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
+                                      maxIterations: Int): Graph[VertexId, ED] = {
+    require(maxIterations > 0)
     val ccGraph = graph.mapVertices { case (vid, _) => vid }
     def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
       if (edge.srcAttr < edge.dstAttr) {
@@ -47,11 +48,26 @@ object ConnectedComponents {
       }
     }
     val initialMessage = Long.MaxValue
-    val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
+    val pregelGraph = Pregel(ccGraph, initialMessage,
+      maxIterations, EdgeDirection.Either)(
       vprog = (id, attr, msg) => math.min(attr, msg),
       sendMsg = sendMessage,
       mergeMsg = (a, b) => math.min(a, b))
     ccGraph.unpersist()
     pregelGraph
   } // end of connectedComponents
+
+  /**
+    * Compute the connected component membership of each vertex and return a graph with the vertex
+    * value containing the lowest vertex id in the connected component containing that vertex.
+    *
+    * @tparam VD the vertex attribute type (discarded in the computation)
+    * @tparam ED the edge attribute type (preserved in the computation)
+    * @param graph the graph for which to compute the connected components
+    * @return a graph with vertex attributes containing the smallest vertex in each
+    *         connected component
+    */
+  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
+    run(graph, Int.MaxValue)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org