You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 08:00:07 UTC

[43/50] git commit: Wrap methods in the appropriate class/object declaration

Wrap methods in the appropriate class/object declaration


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6f6f8c92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6f6f8c92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6f6f8c92

Branch: refs/heads/master
Commit: 6f6f8c928ce493357d4d32e46971c5e401682ea8
Parents: 67795db
Author: Ankur Dave <an...@gmail.com>
Authored: Mon Jan 13 21:55:35 2014 -0800
Committer: Ankur Dave <an...@gmail.com>
Committed: Mon Jan 13 21:55:35 2014 -0800

----------------------------------------------------------------------
 docs/graphx-programming-guide.md | 149 +++++++++++++++++++---------------
 1 file changed, 85 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6f6f8c92/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index aadeb38..29d397c 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -256,7 +256,7 @@ compute the in-degree of each vertex (defined in `GraphOps`) by the following:
 {% highlight scala %}
 val graph: Graph[(String, String), String]
 // Use the implicit GraphOps.inDegrees operator
-val indDegrees: VertexRDD[Int] = graph.inDegrees
+val inDegrees: VertexRDD[Int] = graph.inDegrees
 {% endhighlight %}
 
 The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be
@@ -270,9 +270,11 @@ In direct analogy to the RDD `map` operator, the property
 graph contains the following:
 
 {% highlight scala %}
-def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
-def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
-def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
+class Graph[VD, ED] {
+  def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
+  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
+  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
+}
 {% endhighlight %}
 
 Each of these operators yields a new graph with the vertex or edge properties modified by the user
@@ -314,11 +316,13 @@ Currently GraphX supports only a simple set of commonly used structural operator
 add more in the future.  The following is a list of the basic structural operators.
 
 {% highlight scala %}
-def reverse: Graph[VD, ED]
-def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
-             vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
-def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
-def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
+class Graph[VD, ED] {
+  def reverse: Graph[VD, ED]
+  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
+               vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
+  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
+  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
+}
 {% endhighlight %}
 
 The [`reverse`][Graph.reverse] operator returns a new graph with all the edge directions reversed.
@@ -400,10 +404,12 @@ might want to pull vertex properties from one graph into another.  These tasks c
 using the *join* operators. Below we list the key join operators:
 
 {% highlight scala %}
-def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
-  : Graph[VD, ED]
-def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
-  : Graph[VD2, ED]
+class Graph[VD, ED] {
+  def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
+    : Graph[VD, ED]
+  def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
+    : Graph[VD2, ED]
+}
 {% endhighlight %}
 
 The [`joinVertices`][GraphOps.joinVertices] operator joins the vertices with the input RDD and
@@ -470,10 +476,12 @@ The core (heavily optimized) aggregation primitive in GraphX is the
 [`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
 
 {% highlight scala %}
-def mapReduceTriplets[A](
-    map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
-    reduce: (A, A) => A)
-  : VertexRDD[A]
+class Graph[VD, ED] {
+  def mapReduceTriplets[A](
+      map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+      reduce: (A, A) => A)
+    : VertexRDD[A]
+}
 {% endhighlight %}
 
 The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which
@@ -564,12 +572,19 @@ val maxDegrees: (VertexID, Int)   = graph.degrees.reduce(max)
 ### Collecting Neighbors
 
 In some cases it may be easier to express computation by collecting neighboring vertices and their
-attributes at each vertex. This can be easily accomplished using the `collectNeighborIds` and the
-`collectNeighbors` operators.
+attributes at each vertex. This can be easily accomplished using the
+[`collectNeighborIds`][GraphOps.collectNeighborIds] and the
+[`collectNeighbors`][GraphOps.collectNeighbors] operators.
+
+[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]]
+[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]]
+
 
 {% highlight scala %}
-def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] =
-def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+class GraphOps[VD, ED] {
+  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
+  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+}
 {% endhighlight %}
 
 > Note that these operators can be quite costly as they duplicate information and require
@@ -600,40 +615,44 @@ messages remaining.
 > neighboring vertices and the message construction is done in parallel using a user defined
 > messaging function.  These constraints allow additional optimization within GraphX.
 
-The following is type signature of the Pregel operator as well as a *sketch* of its implementation
-(note calls to graph.cache have been removed):
+The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
+of its implementation (note calls to graph.cache have been removed):
+
+[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
 
 {% highlight scala %}
-def pregel[A]
-    (initialMsg: A,
-     maxIter: Int = Int.MaxValue,
-     activeDir: EdgeDirection = EdgeDirection.Out)
-    (vprog: (VertexID, VD, A) => VD,
-     sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
-     mergeMsg: (A, A) => A)
-  : Graph[VD, ED] = {
-  // Receive the initial message at each vertex
-  var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
-  // compute the messages
-  var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
-  var activeMessages = messages.count()
-  // Loop until no messages remain or maxIterations is achieved
-  var i = 0
-  while (activeMessages > 0 && i < maxIterations) {
-    // Receive the messages: -----------------------------------------------------------------------
-    // Run the vertex program on all vertices that receive messages
-    val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
-    // Merge the new vertex values back into the graph
-    g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
-    // Send Messages: ------------------------------------------------------------------------------
-    // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
-    // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
-    // on edges in the activeDir of vertices in newVerts
-    messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
-    activeMessages = messages.count()
-    i += 1
+class GraphOps[VD, ED] {
+  def pregel[A]
+      (initialMsg: A,
+       maxIter: Int = Int.MaxValue,
+       activeDir: EdgeDirection = EdgeDirection.Out)
+      (vprog: (VertexID, VD, A) => VD,
+       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+       mergeMsg: (A, A) => A)
+    : Graph[VD, ED] = {
+    // Receive the initial message at each vertex
+    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
+    // compute the messages
+    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
+    var activeMessages = messages.count()
+    // Loop until no messages remain or maxIterations is achieved
+    var i = 0
+    while (activeMessages > 0 && i < maxIterations) {
+      // Receive the messages: -----------------------------------------------------------------------
+      // Run the vertex program on all vertices that receive messages
+      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+      // Merge the new vertex values back into the graph
+      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
+      // Send Messages: ------------------------------------------------------------------------------
+      // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
+      // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
+      // on edges in the activeDir of vertices in newVerts
+      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
+      activeMessages = messages.count()
+      i += 1
+    }
+    g
   }
-  g
 }
 {% endhighlight %}
 
@@ -749,18 +768,20 @@ time without hash evaluations. To leverage this indexed data-structure, the `Ver
 following additional functionality:
 
 {% highlight scala %}
-// Filter the vertex set but preserves the internal index
-def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
-// Transform the values without changing the ids (preserves the internal index)
-def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
-def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
-// Remove vertices from this set that appear in the other set
-def diff(other: VertexRDD[VD]): VertexRDD[VD]
-// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
-def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
-def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
-// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
-def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
+class VertexRDD[VD] {
+  // Filter the vertex set but preserves the internal index
+  def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
+  // Transform the values without changing the ids (preserves the internal index)
+  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
+  def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
+  // Remove vertices from this set that appear in the other set
+  def diff(other: VertexRDD[VD]): VertexRDD[VD]
+  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
+  def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
+  def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
+  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
+  def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
+}
 {% endhighlight %}
 
 Notice, for example,  how the `filter` operator returns an `VertexRDD`.  Filter is actually