You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:59:59 UTC

[35/50] git commit: Finished documenting vertexrdd.

Finished documenting vertexrdd.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ee8931d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ee8931d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ee8931d2

Branch: refs/heads/master
Commit: ee8931d2c6503716de640d6d1249c515e1fd85d3
Parents: 0fbc0b0
Author: Joseph E. Gonzalez <jo...@gmail.com>
Authored: Mon Jan 13 19:30:25 2014 -0800
Committer: Joseph E. Gonzalez <jo...@gmail.com>
Committed: Mon Jan 13 19:30:35 2014 -0800

----------------------------------------------------------------------
 docs/graphx-programming-guide.md | 53 +++++++++++++++++++++++++++++++++++
 1 file changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee8931d2/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 77d8078..76de26c 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -683,7 +683,60 @@ val sssp = initialGraph.pregel(Double.PositiveInfinity)(
 # Vertex and Edge RDDs
 <a name="vertex_and_edge_rdds"></a>
 
+GraphX exposes `RDD` views of the vertices and edges stored within the graph.  However, because
+GraphX maintains the vertices and edges in optimized data-structures and these data-structures
+provide additional functionality, the vertices and edges are returned as `VertexRDD` and `EdgeRDD`
+respectively.  In this section we review some of the additional useful functionality in these types.
 
+## VertexRDDs
+
+The `VertexRDD[A]` extends the more traditional `RDD[(VertexId, A)]` but adds the additional
+constraint that each `VertexId` occurs only *once*.  Moreover, `VertexRDD[A]` represents a *set* of
+vertices each with an attribute of type `A`.  Internally, this is achieved by storing the vertex
+attributes in a reusable hash-map data-structure.  As a consequence if two `VertexRDD`s are derived
+from the same base `VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant
+time without hash evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the
+following additional functionality:
+
+{% highlight scala %}
+// Filter the vertex set but preserves the internal index
+def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
+// Transform the values without changing the ids (preserves the internal index)
+def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
+def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
+// Remove vertices from this set that appear in the other set
+def diff(other: VertexRDD[VD]): VertexRDD[VD]
+// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
+def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
+def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
+// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
+def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
+{% endhighlight %}
+
+Notice, for example,  how the `filter` operator returns an `VertexRDD`.  Filter is actually
+implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins
+with other `VertexRDD`s.  Likewise, the `mapValues` operators do not allow the `map` function to
+change the `VertexId` thereby enabling the same `HashMap` data-structures to be reused.  Both the
+`leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same
+`HashMap` and implement the join by linear scan rather than costly point lookups.
+
+The `aggregateUsingIndex` operator can be slightly confusing but is also useful for efficient
+construction of a new `VertexRDD` from an `RDD[(VertexId, A)]`.  Conceptually, if I have constructed
+a `VertexRDD[B]` over a set of vertices, *which is a super-set* of the vertices in some
+`RDD[(VertexId, A)]` then I can reuse the index to both aggregate and then subsequently index the
+RDD.  For example:
+
+{% highlight scala %}
+val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
+val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
+// There should be 200 entries in rddB
+rddB.count
+val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
+// There should be 100 entries in setB
+setB.count
+// Joining A and B should now be fast!
+val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
+{% endhighlight %}
 
 # Optimized Representation