You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2016/02/24 21:09:20 UTC
spark git commit: [SPARK-11432][GRAPHX] Personalized PageRank
shouldn't use uniform initialization
Repository: spark
Updated Branches:
refs/heads/branch-1.5 4695eb832 -> d2c1c67cf
[SPARK-11432][GRAPHX] Personalized PageRank shouldn't use uniform initialization
Changes the personalized pagerank initialization to be non-uniform.
Author: Yves Raimond <yr...@netflix.com>
Closes #9386 from moustaki/personalized-pagerank-init.
(cherry picked from commit efaa4721b511a1d29229facde6457a6dcda18966)
Signed-off-by: Xiangrui Meng <me...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2c1c67c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2c1c67c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2c1c67c
Branch: refs/heads/branch-1.5
Commit: d2c1c67cf47047d05ff8d0728ac0d3380b591c50
Parents: 4695eb8
Author: Yves Raimond <yr...@netflix.com>
Authored: Mon Nov 2 20:35:59 2015 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Feb 24 12:09:17 2016 -0800
----------------------------------------------------------------------
.../org/apache/spark/graphx/lib/PageRank.scala | 29 ++++++++++++--------
.../apache/spark/graphx/lib/PageRankSuite.scala | 13 ++++++---
2 files changed, 27 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d2c1c67c/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 8c0a461..52b237f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -104,18 +104,23 @@ object PageRank extends Logging {
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
+ val personalized = srcId isDefined
+ val src: VertexId = srcId.getOrElse(-1L)
+
// Initialize the PageRank graph with each edge attribute having
- // weight 1/outDegree and each vertex with attribute 1.0.
+ // weight 1/outDegree and each vertex with attribute resetProb.
+ // When running personalized pagerank, only the source vertex
+ // has an attribute resetProb. All others are set to 0.
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
- .mapVertices( (id, attr) => resetProb )
+ .mapVertices { (id, attr) =>
+ if (!(id != src && personalized)) resetProb else 0.0
+ }
- val personalized = srcId isDefined
- val src: VertexId = srcId.getOrElse(-1L)
def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
var iteration = 0
@@ -192,6 +197,9 @@ object PageRank extends Logging {
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
+ val personalized = srcId.isDefined
+ val src: VertexId = srcId.getOrElse(-1L)
+
// Initialize the pagerankGraph with each edge attribute
// having weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[(Double, Double), Double] = graph
@@ -202,13 +210,11 @@ object PageRank extends Logging {
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
- .mapVertices( (id, attr) => (0.0, 0.0) )
+ .mapVertices { (id, attr) =>
+ if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
+ }
.cache()
- val personalized = srcId.isDefined
- val src: VertexId = srcId.getOrElse(-1L)
-
-
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
@@ -225,7 +231,8 @@ object PageRank extends Logging {
teleport = oldPR*delta
val newPR = teleport + (1.0 - resetProb) * msgSum
- (newPR, newPR - oldPR)
+ val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
+ (newPR, newDelta)
}
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
@@ -239,7 +246,7 @@ object PageRank extends Logging {
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
- val initialMessage = resetProb / (1.0 - resetProb)
+ val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
val vp = if (personalized) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d2c1c67c/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index 45f1e30..bdff314 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -109,17 +109,22 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(notMatching === 0)
val staticErrors = staticRanks2.map { case (vid, pr) =>
- val correct = (vid > 0 && pr == resetProb) ||
- (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb *
- (nVertices - 1)) )) < 1.0E-5)
+ val correct = (vid > 0 && pr == 0.0) ||
+ (vid == 0 && pr == resetProb)
if (!correct) 1 else 0
}
assert(staticErrors.sum === 0)
val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+
+ // We have one outbound edge from 1 to 0
+ val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
+ .vertices.cache()
+ val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
+ assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
}
- } // end of test Star PageRank
+ } // end of test Star PersonalPageRank
test("Grid PageRank") {
withSpark { sc =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org