You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/12 23:08:42 UTC

git commit: [SPARK-3427] [GraphX] Avoid active vertex tracking in static PageRank

Repository: spark
Updated Branches:
  refs/heads/master eae81b0bf -> 15a564598


[SPARK-3427] [GraphX] Avoid active vertex tracking in static PageRank

GraphX's current implementation of static (fixed iteration count) PageRank uses the Pregel API. This unnecessarily tracks active vertices, even though in static PageRank all vertices are always active. Active vertex tracking incurs the following costs:

1. A shuffle per iteration to ship the active sets to the edge partitions.
2. A hash table creation per iteration at each partition to index the active sets for lookup.
3. A hash lookup per edge to check whether the source vertex is active.

I reimplemented static PageRank using the lower-level GraphX API instead of the Pregel API. In benchmarks on a 16-node m2.4xlarge cluster, this provided a 23% speedup (from 514 s to 397 s, mean over 3 trials) for 10 iterations of PageRank on a synthetic graph with 10M vertices and 1.27B edges.

Author: Ankur Dave <an...@gmail.com>

Closes #2308 from ankurdave/SPARK-3427 and squashes the following commits:

449996a [Ankur Dave] Avoid unnecessary active vertex tracking in static PageRank


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15a56459
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15a56459
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15a56459

Branch: refs/heads/master
Commit: 15a564598fe63003652b1e24527c432080b5976c
Parents: eae81b0
Author: Ankur Dave <an...@gmail.com>
Authored: Fri Sep 12 14:08:38 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Sep 12 14:08:38 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/graphx/lib/PageRank.scala  | 45 +++++++++++++-------
 1 file changed, 29 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/15a56459/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 614555a..257e2f3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -79,30 +79,43 @@ object PageRank extends Logging {
   def run[VD: ClassTag, ED: ClassTag](
       graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
   {
-    // Initialize the pagerankGraph with each edge attribute having
+    // Initialize the PageRank graph with each edge attribute having
     // weight 1/outDegree and each vertex with attribute 1.0.
-    val pagerankGraph: Graph[Double, Double] = graph
+    var rankGraph: Graph[Double, Double] = graph
       // Associate the degree with each vertex
       .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
       // Set the weight on the edges based on the degree
       .mapTriplets( e => 1.0 / e.srcAttr )
       // Set the vertex attributes to the initial pagerank values
-      .mapVertices( (id, attr) => 1.0 )
-      .cache()
+      .mapVertices( (id, attr) => resetProb )
 
-    // Define the three functions needed to implement PageRank in the GraphX
-    // version of Pregel
-    def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
-      resetProb + (1.0 - resetProb) * msgSum
-    def sendMessage(edge: EdgeTriplet[Double, Double]) =
-      Iterator((edge.dstId, edge.srcAttr * edge.attr))
-    def messageCombiner(a: Double, b: Double): Double = a + b
-    // The initial message received by all vertices in PageRank
-    val initialMessage = 0.0
+    var iteration = 0
+    var prevRankGraph: Graph[Double, Double] = null
+    while (iteration < numIter) {
+      rankGraph.cache()
 
-    // Execute pregel for a fixed number of iterations.
-    Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out)(
-      vertexProgram, sendMessage, messageCombiner)
+      // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
+      // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
+      val rankUpdates = rankGraph.mapReduceTriplets[Double](
+        e => Iterator((e.dstId, e.srcAttr * e.attr)), _ + _)
+
+      // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
+      // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
+      // edge partitions.
+      prevRankGraph = rankGraph
+      rankGraph = rankGraph.joinVertices(rankUpdates) {
+        (id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
+      }.cache()
+
+      rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
+      logInfo(s"PageRank finished iteration $iteration.")
+      prevRankGraph.vertices.unpersist(false)
+      prevRankGraph.edges.unpersist(false)
+
+      iteration += 1
+    }
+
+    rankGraph
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org