You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/01 20:57:36 UTC

spark git commit: [SPARK-5854] personalized page rank

Repository: spark
Updated Branches:
  refs/heads/master 27de6fef6 -> 7d427222d


[SPARK-5854] personalized page rank

Here's a modification to PageRank which does personalized PageRank.  The approach is basically similar to that outlined by Bahmani et al. from 2010 (http://arxiv.org/pdf/1006.2880.pdf).

I'm sure this needs tuning up or other considerations, so let me know how I can improve this.

Author: Dan McClary <da...@gmail.com>
Author: dwmclary <da...@gmail.com>

Closes #4774 from dwmclary/SPARK-5854-Personalized-PageRank and squashes the following commits:

8b907db [dwmclary] fixed scalastyle errors in PageRankSuite
2c20e5d [dwmclary] merged with upstream master
d6cebac [dwmclary] updated as per style requests
7d00c23 [Dan McClary] fixed line overrun in personalizedVertexPageRank
d711677 [Dan McClary] updated vertexProgram to restore binary compatibility for inner method
bb8d507 [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
fba0edd [Dan McClary] fixed silly mistakes
de51be2 [Dan McClary] cleaned up whitespace between comments and methods
0c30d0c [Dan McClary] updated to maintain binary compatibility
aaf0b4b [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
76773f6 [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
44ada8e [Dan McClary] updated tolerance on chain PPR
1ffed95 [Dan McClary] updated tolerance on chain PPR
b67ac69 [Dan McClary] updated tolerance on chain PPR
a560942 [Dan McClary] rolled PPR into pregel code for PageRank
6dc2c29 [Dan McClary] initial implementation of personalized page rank


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d427222
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d427222
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d427222

Branch: refs/heads/master
Commit: 7d427222dca4807ec55e8d9a7de6ffe861cd0d24
Parents: 27de6fe
Author: Dan McClary <da...@gmail.com>
Authored: Fri May 1 11:55:43 2015 -0700
Committer: Joseph Gonzalez <jo...@gmail.com>
Committed: Fri May 1 11:55:43 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/graphx/GraphOps.scala      | 25 ++++++
 .../org/apache/spark/graphx/lib/PageRank.scala  | 93 ++++++++++++++++++--
 .../apache/spark/graphx/lib/PageRankSuite.scala | 47 ++++++++++
 3 files changed, 159 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d427222/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 86f611d..7edd627 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -372,6 +372,31 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
     PageRank.runUntilConvergence(graph, tol, resetProb)
   }
 
+
+  /**
+   * Run personalized PageRank for a given vertex, such that all random walks
+   * are started relative to the source node.
+   *
+   * @see [[org.apache.spark.graphx.lib.PageRank$#runUntilConvergenceWithOptions]]
+   */
+  def personalizedPageRank(src: VertexId, tol: Double,
+    resetProb: Double = 0.15) : Graph[Double, Double] = {
+    PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src))
+  }
+
+  /**
+   * Run Personalized PageRank for a fixed number of iterations with
+   * with all iterations originating at the source node
+   * returning a graph with vertex attributes
+   * containing the PageRank and edge attributes the normalized edge weight.
+   *
+   * @see [[org.apache.spark.graphx.lib.PageRank$#runWithOptions]]
+   */
+  def staticPersonalizedPageRank(src: VertexId, numIter: Int,
+    resetProb: Double = 0.15) : Graph[Double, Double] = {
+    PageRank.runWithOptions(graph, numIter, resetProb, Some(src))
+  }
+
   /**
    * Run PageRank for a fixed number of iterations returning a graph with vertex attributes
    * containing the PageRank and edge attributes the normalized edge weight.

http://git-wip-us.apache.org/repos/asf/spark/blob/7d427222/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 042e366..bc974b2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.graphx.lib
 
 import scala.reflect.ClassTag
+import scala.language.postfixOps
 
 import org.apache.spark.Logging
 import org.apache.spark.graphx._
@@ -60,6 +61,7 @@ import org.apache.spark.graphx._
  */
 object PageRank extends Logging {
 
+
   /**
    * Run PageRank for a fixed number of iterations returning a graph
    * with vertex attributes containing the PageRank and edge
@@ -74,10 +76,33 @@ object PageRank extends Logging {
    *
    * @return the graph containing with each vertex containing the PageRank and each edge
    *         containing the normalized weight.
+   */
+  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
+    resetProb: Double = 0.15): Graph[Double, Double] =
+  {
+    runWithOptions(graph, numIter, resetProb)
+  }
+
+  /**
+   * Run PageRank for a fixed number of iterations returning a graph
+   * with vertex attributes containing the PageRank and edge
+   * attributes the normalized edge weight.
+   *
+   * @tparam VD the original vertex attribute (not used)
+   * @tparam ED the original edge attribute (not used)
+   *
+   * @param graph the graph on which to compute PageRank
+   * @param numIter the number of iterations of PageRank to run
+   * @param resetProb the random reset probability (alpha)
+   * @param srcId the source vertex for a Personalized Page Rank (optional)
+   *
+   * @return the graph containing with each vertex containing the PageRank and each edge
+   *         containing the normalized weight.
    *
    */
-  def run[VD: ClassTag, ED: ClassTag](
-      graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
+  def runWithOptions[VD: ClassTag, ED: ClassTag](
+      graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
+      srcId: Option[VertexId] = None): Graph[Double, Double] =
   {
     // Initialize the PageRank graph with each edge attribute having
     // weight 1/outDegree and each vertex with attribute 1.0.
@@ -89,6 +114,10 @@ object PageRank extends Logging {
       // Set the vertex attributes to the initial pagerank values
       .mapVertices( (id, attr) => resetProb )
 
+    val personalized = srcId isDefined
+    val src: VertexId = srcId.getOrElse(-1L)
+    def delta(u: VertexId, v: VertexId):Double = { if (u == v) 1.0 else 0.0 }
+
     var iteration = 0
     var prevRankGraph: Graph[Double, Double] = null
     while (iteration < numIter) {
@@ -103,8 +132,14 @@ object PageRank extends Logging {
       // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
       // edge partitions.
       prevRankGraph = rankGraph
+      val rPrb = if (personalized) {
+        (src: VertexId ,id: VertexId) => resetProb * delta(src,id)
+      } else {
+        (src: VertexId, id: VertexId) => resetProb
+      }
+
       rankGraph = rankGraph.joinVertices(rankUpdates) {
-        (id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
+        (id, oldRank, msgSum) => rPrb(src,id) + (1.0 - resetProb) * msgSum
       }.cache()
 
       rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
@@ -133,7 +168,29 @@ object PageRank extends Logging {
    *         containing the normalized weight.
    */
   def runUntilConvergence[VD: ClassTag, ED: ClassTag](
-      graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
+    graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
+  {
+      runUntilConvergenceWithOptions(graph, tol, resetProb)
+  }
+
+  /**
+   * Run a dynamic version of PageRank returning a graph with vertex attributes containing the
+   * PageRank and edge attributes containing the normalized edge weight.
+   *
+   * @tparam VD the original vertex attribute (not used)
+   * @tparam ED the original edge attribute (not used)
+   *
+   * @param graph the graph on which to compute PageRank
+   * @param tol the tolerance allowed at convergence (smaller => more accurate).
+   * @param resetProb the random reset probability (alpha)
+   * @param srcId the source vertex for a Personalized Page Rank (optional)
+   *
+   * @return the graph containing with each vertex containing the PageRank and each edge
+   *         containing the normalized weight.
+   */
+  def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](
+      graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
+      srcId: Option[VertexId] = None): Graph[Double, Double] =
   {
     // Initialize the pagerankGraph with each edge attribute
     // having weight 1/outDegree and each vertex with attribute 1.0.
@@ -148,6 +205,10 @@ object PageRank extends Logging {
       .mapVertices( (id, attr) => (0.0, 0.0) )
       .cache()
 
+    val personalized = srcId.isDefined
+    val src: VertexId = srcId.getOrElse(-1L)
+
+
     // Define the three functions needed to implement PageRank in the GraphX
     // version of Pregel
     def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
@@ -156,7 +217,18 @@ object PageRank extends Logging {
       (newPR, newPR - oldPR)
     }
 
-    def sendMessage(edge: EdgeTriplet[(Double, Double), Double]): Iterator[(VertexId, Double)] = {
+    def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
+      msgSum: Double): (Double, Double) = {
+      val (oldPR, lastDelta) = attr
+      var teleport = oldPR
+      val delta = if (src==id) 1.0 else 0.0
+      teleport = oldPR*delta
+
+      val newPR = teleport + (1.0 - resetProb) * msgSum
+      (newPR, newPR - oldPR)
+    }
+
+    def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
       if (edge.srcAttr._2 > tol) {
         Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
       } else {
@@ -170,8 +242,17 @@ object PageRank extends Logging {
     val initialMessage = resetProb / (1.0 - resetProb)
 
     // Execute a dynamic version of Pregel.
+    val vp = if (personalized) {
+      (id: VertexId, attr: (Double, Double),msgSum: Double) =>
+        personalizedVertexProgram(id, attr, msgSum)
+    } else {
+      (id: VertexId, attr: (Double, Double), msgSum: Double) =>
+        vertexProgram(id, attr, msgSum)
+    }
+
     Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
-      vertexProgram, sendMessage, messageCombiner)
+      vp, sendMessage, messageCombiner)
       .mapVertices((vid, attr) => attr._1)
   } // end of deltaPageRank
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d427222/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index 95804b0..3f3c9df 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -92,6 +92,36 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
     }
   } // end of test Star PageRank
 
+  test("Star PersonalPageRank") {
+    withSpark { sc =>
+      val nVertices = 100
+      val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
+      val resetProb = 0.15
+      val errorTol = 1.0e-5
+
+      val staticRanks1 = starGraph.staticPersonalizedPageRank(0,numIter = 1, resetProb).vertices
+      val staticRanks2 = starGraph.staticPersonalizedPageRank(0,numIter = 2, resetProb)
+        .vertices.cache()
+
+      // Static PageRank should only take 2 iterations to converge
+      val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
+        if (pr1 != pr2) 1 else 0
+      }.map { case (vid, test) => test }.sum
+      assert(notMatching === 0)
+
+      val staticErrors = staticRanks2.map { case (vid, pr) =>
+        val correct = (vid > 0 && pr == resetProb) ||
+          (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb *
+            (nVertices - 1)) )) < 1.0E-5)
+        if (!correct) 1 else 0
+      }
+      assert(staticErrors.sum === 0)
+
+      val dynamicRanks = starGraph.personalizedPageRank(0,0, resetProb).vertices.cache()
+      assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+    }
+  } // end of test Star PageRank
+
   test("Grid PageRank") {
     withSpark { sc =>
       val rows = 10
@@ -128,4 +158,21 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
       assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
     }
   }
+
+  test("Chain PersonalizedPageRank") {
+    withSpark { sc =>
+      val chain1 = (0 until 9).map(x => (x, x + 1) )
+      val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
+      val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+      val resetProb = 0.15
+      val tol = 0.0001
+      val numIter = 10
+      val errorTol = 1.0e-1
+
+      val staticRanks = chain.staticPersonalizedPageRank(4, numIter, resetProb).vertices
+      val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
+
+      assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org