You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/20 21:24:17 UTC
spark git commit: [SPARK-13386][GRAPHX] ConnectedComponents should
support maxIteration option
Repository: spark
Updated Branches:
refs/heads/master 9ca79c1ec -> 6ce7c481d
[SPARK-13386][GRAPHX] ConnectedComponents should support maxIteration option
JIRA: https://issues.apache.org/jira/browse/SPARK-13386
## What changes were proposed in this pull request?
add maxIteration option for ConnectedComponents algorithm
## How was the this patch tested?
unit tests passed
Author: Zheng RuiFeng <ru...@foxmail.com>
Closes #11268 from zhengruifeng/ccwithmax.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ce7c481
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ce7c481
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ce7c481
Branch: refs/heads/master
Commit: 6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc
Parents: 9ca79c1
Author: Zheng RuiFeng <ru...@foxmail.com>
Authored: Sat Feb 20 12:24:10 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Feb 20 12:24:10 2016 -0800
----------------------------------------------------------------------
.../org/apache/spark/graphx/GraphOps.scala | 14 ++++++++++--
.../spark/graphx/lib/ConnectedComponents.scala | 24 ++++++++++++++++----
2 files changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6ce7c481/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index d048fb5..97a8223 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -406,13 +406,23 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
}
/**
+ * Compute the connected component membership of each vertex and return a graph with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
+ */
+ def connectedComponents(): Graph[VertexId, ED] = {
+ ConnectedComponents.run(graph)
+ }
+
+ /**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
*/
- def connectedComponents(): Graph[VertexId, ED] = {
- ConnectedComponents.run(graph)
+ def connectedComponents(maxIterations: Int): Graph[VertexId, ED] = {
+ ConnectedComponents.run(graph, maxIterations)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/6ce7c481/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index f72cbb1..40cf073 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -29,13 +29,14 @@ object ConnectedComponents {
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
- *
* @param graph the graph for which to compute the connected components
- *
+ * @param maxIterations the maximum number of iterations to run for
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
- def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
+ maxIterations: Int): Graph[VertexId, ED] = {
+ require(maxIterations > 0)
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
if (edge.srcAttr < edge.dstAttr) {
@@ -47,11 +48,26 @@ object ConnectedComponents {
}
}
val initialMessage = Long.MaxValue
- val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
+ val pregelGraph = Pregel(ccGraph, initialMessage,
+ maxIterations, EdgeDirection.Either)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
ccGraph.unpersist()
pregelGraph
} // end of connectedComponents
+
+ /**
+ * Compute the connected component membership of each vertex and return a graph with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @tparam VD the vertex attribute type (discarded in the computation)
+ * @tparam ED the edge attribute type (preserved in the computation)
+ * @param graph the graph for which to compute the connected components
+ * @return a graph with vertex attributes containing the smallest vertex in each
+ * connected component
+ */
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
+ run(graph, Int.MaxValue)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org