You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:59:39 UTC

[15/50] git commit: Improving documentation and identifying potential bug in CC calculation.

Improving documentation and identifying potential bug in CC calculation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/80e4d98d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/80e4d98d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/80e4d98d

Branch: refs/heads/master
Commit: 80e4d98dc656e20dacbd8cdbf92d4912673b42ae
Parents: 8ca9773
Author: Joseph E. Gonzalez <jo...@gmail.com>
Authored: Mon Jan 13 13:40:16 2014 -0800
Committer: Joseph E. Gonzalez <jo...@gmail.com>
Committed: Mon Jan 13 13:40:16 2014 -0800

----------------------------------------------------------------------
 docs/graphx-programming-guide.md                | 33 ++++++++++++---
 .../org/apache/spark/graphx/GraphOps.scala      |  4 +-
 .../spark/graphx/lib/ConnectedComponents.scala  | 44 +++++++++++++-------
 .../graphx/lib/ConnectedComponentsSuite.scala   | 30 +++++++++++++
 4 files changed, 89 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80e4d98d/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 2697b2d..ed976b8 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -84,7 +84,8 @@ import org.apache.spark.graphx._
 import org.apache.spark.rdd.RDD
 {% endhighlight %}
 
-If you are not using the Spark shell you will also need a Spark context.
+If you are not using the Spark shell you will also need a `SparkContext`.  To learn more about
+getting started with Spark refer to the [Spark Quick Start Guide](quick-start.html).
 
 # The Property Graph
 <a name="property_graph"></a>
@@ -190,7 +191,7 @@ and `graph.edges` members respectively.
 {% highlight scala %}
 val graph: Graph[(String, String), String] // Constructed from above
 // Count all users which are postdocs
-graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc"}.count
+graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
 // Count all the edges where src > dst
 graph.edges.filter(e => e.srcId > e.dstId).count
 {% endhighlight %}
@@ -258,8 +259,10 @@ val graph: Graph[(String, String), String]
 val indDegrees: VertexRDD[Int] = graph.inDegrees
 {% endhighlight %}
 
-The reason for differentiating between core graph operations and GraphOps is to be able to support
-various graph representations in the future.
+The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be
+able to support different graph representations in the future.  Each graph representation must
+provide implementations of the core operations and reuse many of the useful operations defined in
+[`GraphOps`][GraphOps].
 
 ## Property Operators
 
@@ -334,14 +337,32 @@ interest or eliminate broken links. For example in the following code we remove
 [Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED]
 
 {% highlight scala %}
-val users: RDD[(VertexId, (String, String))]
-val edges: RDD[Edge[String]]
+// Create an RDD for the vertices
+val users: RDD[(VertexID, (String, String))] =
+  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
+                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
+                       (4L, ("peter", "student"))))
+// Create an RDD for edges
+val relationships: RDD[Edge[String]] =
+  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
+                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
+                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
 // Define a default user in case there are relationship with missing user
 val defaultUser = ("John Doe", "Missing")
 // Build the initial Graph
 val graph = Graph(users, relationships, defaultUser)
+// Notice that there is a user 0 (for which we have no information) connecting users
+// 4 (peter) and 5 (franklin).
+graph.triplets.map(
+    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
+  ).collect.foreach(println(_))
 // Remove missing vertices as well as the edges to connected to them
 val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
+// The valid subgraph will disconnect users 4 and 5 by removing user 0
+validGraph.vertices.collect.foreach(println(_))
+validGraph.triplets.map(
+    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
+  ).collect.foreach(println(_))
 {% endhighlight %}
 
 > Note in the above example only the vertex predicate is provided.  The `subgraph` operator defaults

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80e4d98d/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 2b3b95e..a0a40e2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -325,8 +325,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
    *
    * @see [[org.apache.spark.graphx.lib.ConnectedComponents]]
    */
-  def connectedComponents(): Graph[VertexID, ED] = {
-    ConnectedComponents.run(graph)
+  def connectedComponents(undirected: Boolean = true): Graph[VertexID, ED] = {
+    ConnectedComponents.run(graph, undirected)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80e4d98d/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 4a83e2d..d078d2a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -14,26 +14,42 @@ object ConnectedComponents {
    * @tparam ED the edge attribute type (preserved in the computation)
    *
    * @param graph the graph for which to compute the connected components
+   * @param undirected compute reachability ignoring edge direction.
    *
    * @return a graph with vertex attributes containing the smallest vertex in each
    *         connected component
    */
-  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
+  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], undirected: Boolean = true):
+    Graph[VertexID, ED] = {
     val ccGraph = graph.mapVertices { case (vid, _) => vid }
-
-    def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
-      if (edge.srcAttr < edge.dstAttr) {
-        Iterator((edge.dstId, edge.srcAttr))
-      } else if (edge.srcAttr > edge.dstAttr) {
-        Iterator((edge.srcId, edge.dstAttr))
-      } else {
-        Iterator.empty
+    if (undirected) {
+      def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
+        if (edge.srcAttr < edge.dstAttr) {
+          Iterator((edge.dstId, edge.srcAttr))
+        } else if (edge.srcAttr > edge.dstAttr) {
+          Iterator((edge.srcId, edge.dstAttr))
+        } else {
+          Iterator.empty
+        }
+      }
+      val initialMessage = Long.MaxValue
+      Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Both)(
+        vprog = (id, attr, msg) => math.min(attr, msg),
+        sendMsg = sendMessage,
+        mergeMsg = (a, b) => math.min(a, b))
+    } else {
+      def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
+        if (edge.srcAttr < edge.dstAttr) {
+          Iterator((edge.dstId, edge.srcAttr))
+        } else {
+          Iterator.empty
+        }
       }
+      val initialMessage = Long.MaxValue
+      Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Out)(
+        vprog = (id, attr, msg) => math.min(attr, msg),
+        sendMsg = sendMessage,
+        mergeMsg = (a, b) => math.min(a, b))
     }
-    val initialMessage = Long.MaxValue
-    Pregel(ccGraph, initialMessage)(
-      vprog = (id, attr, msg) => math.min(attr, msg),
-      sendMsg = sendMessage,
-      mergeMsg = (a, b) => math.min(a, b))
   } // end of connectedComponents
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80e4d98d/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
index 66612b3..86da8f1 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
@@ -80,4 +80,34 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
     }
   } // end of reverse chain connected components
 
+  test("Connected Components on a Toy Connected Graph") {
+    withSpark { sc =>
+      // Create an RDD for the vertices
+      val users: RDD[(VertexID, (String, String))] =
+        sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
+                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
+                       (4L, ("peter", "student"))))
+      // Create an RDD for edges
+      val relationships: RDD[Edge[String]] =
+        sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
+                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
+                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
+      // Edges are:
+      //   2 ---> 5 ---> 3
+      //          | \
+      //          V   \|
+      //   4 ---> 0    7
+      //
+      // Define a default user in case there are relationship with missing user
+      val defaultUser = ("John Doe", "Missing")
+      // Build the initial Graph
+      val graph = Graph(users, relationships, defaultUser)
+      val ccGraph = graph.connectedComponents(undirected = true)
+      val vertices = ccGraph.vertices.collect
+      for ( (id, cc) <- vertices ) {
+        assert(cc == 0)
+      }
+    }
+  } // end of toy connected components
+
 }