You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2016/02/24 21:09:20 UTC

spark git commit: [SPARK-11432][GRAPHX] Personalized PageRank shouldn't use uniform initialization

Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4695eb832 -> d2c1c67cf


[SPARK-11432][GRAPHX] Personalized PageRank shouldn't use uniform initialization

Changes the personalized pagerank initialization to be non-uniform.

Author: Yves Raimond <yr...@netflix.com>

Closes #9386 from moustaki/personalized-pagerank-init.

(cherry picked from commit efaa4721b511a1d29229facde6457a6dcda18966)
Signed-off-by: Xiangrui Meng <me...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2c1c67c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2c1c67c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2c1c67c

Branch: refs/heads/branch-1.5
Commit: d2c1c67cf47047d05ff8d0728ac0d3380b591c50
Parents: 4695eb8
Author: Yves Raimond <yr...@netflix.com>
Authored: Mon Nov 2 20:35:59 2015 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Feb 24 12:09:17 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/graphx/lib/PageRank.scala  | 29 ++++++++++++--------
 .../apache/spark/graphx/lib/PageRankSuite.scala | 13 ++++++---
 2 files changed, 27 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2c1c67c/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 8c0a461..52b237f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -104,18 +104,23 @@ object PageRank extends Logging {
       graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
       srcId: Option[VertexId] = None): Graph[Double, Double] =
   {
+    val personalized = srcId isDefined
+    val src: VertexId = srcId.getOrElse(-1L)
+
     // Initialize the PageRank graph with each edge attribute having
-    // weight 1/outDegree and each vertex with attribute 1.0.
+    // weight 1/outDegree and each vertex with attribute resetProb.
+    // When running personalized pagerank, only the source vertex
+    // has an attribute resetProb. All others are set to 0.
     var rankGraph: Graph[Double, Double] = graph
       // Associate the degree with each vertex
       .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
       // Set the weight on the edges based on the degree
       .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
       // Set the vertex attributes to the initial pagerank values
-      .mapVertices( (id, attr) => resetProb )
+      .mapVertices { (id, attr) =>
+        if (!(id != src && personalized)) resetProb else 0.0
+      }
 
-    val personalized = srcId isDefined
-    val src: VertexId = srcId.getOrElse(-1L)
     def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
 
     var iteration = 0
@@ -192,6 +197,9 @@ object PageRank extends Logging {
       graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
       srcId: Option[VertexId] = None): Graph[Double, Double] =
   {
+    val personalized = srcId.isDefined
+    val src: VertexId = srcId.getOrElse(-1L)
+
     // Initialize the pagerankGraph with each edge attribute
     // having weight 1/outDegree and each vertex with attribute 1.0.
     val pagerankGraph: Graph[(Double, Double), Double] = graph
@@ -202,13 +210,11 @@ object PageRank extends Logging {
       // Set the weight on the edges based on the degree
       .mapTriplets( e => 1.0 / e.srcAttr )
       // Set the vertex attributes to (initalPR, delta = 0)
-      .mapVertices( (id, attr) => (0.0, 0.0) )
+      .mapVertices { (id, attr) =>
+        if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
+      }
       .cache()
 
-    val personalized = srcId.isDefined
-    val src: VertexId = srcId.getOrElse(-1L)
-
-
     // Define the three functions needed to implement PageRank in the GraphX
     // version of Pregel
     def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
@@ -225,7 +231,8 @@ object PageRank extends Logging {
       teleport = oldPR*delta
 
       val newPR = teleport + (1.0 - resetProb) * msgSum
-      (newPR, newPR - oldPR)
+      val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
+      (newPR, newDelta)
     }
 
     def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
@@ -239,7 +246,7 @@ object PageRank extends Logging {
     def messageCombiner(a: Double, b: Double): Double = a + b
 
     // The initial message received by all vertices in PageRank
-    val initialMessage = resetProb / (1.0 - resetProb)
+    val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb)
 
     // Execute a dynamic version of Pregel.
     val vp = if (personalized) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d2c1c67c/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index 45f1e30..bdff314 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -109,17 +109,22 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
       assert(notMatching === 0)
 
       val staticErrors = staticRanks2.map { case (vid, pr) =>
-        val correct = (vid > 0 && pr == resetProb) ||
-          (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb *
-            (nVertices - 1)) )) < 1.0E-5)
+        val correct = (vid > 0 && pr == 0.0) ||
+          (vid == 0 && pr == resetProb)
         if (!correct) 1 else 0
       }
       assert(staticErrors.sum === 0)
 
       val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
       assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+
+      // We have one outbound edge from 1 to 0
+      val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
+        .vertices.cache()
+      val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
+      assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
     }
-  } // end of test Star PageRank
+  } // end of test Star PersonalPageRank
 
   test("Grid PageRank") {
     withSpark { sc =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org