You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/02/15 10:20:51 UTC

spark git commit: [SPARK-12995][GRAPHX] Remove deprecate APIs from Pregel

Repository: spark
Updated Branches:
  refs/heads/master a8bbc4f50 -> 56d49397e


[SPARK-12995][GRAPHX] Remove deprecate APIs from Pregel

Author: Takeshi YAMAMURO <li...@gmail.com>

Closes #10918 from maropu/RemoveDeprecateInPregel.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56d49397
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56d49397
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56d49397

Branch: refs/heads/master
Commit: 56d49397e01306637edf23bbb4f3b9d396cdc6ff
Parents: a8bbc4f
Author: Takeshi YAMAMURO <li...@gmail.com>
Authored: Mon Feb 15 09:20:49 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Feb 15 09:20:49 2016 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Graph.scala   | 49 ------------------
 .../org/apache/spark/graphx/GraphXUtils.scala   | 27 ++++++++++
 .../scala/org/apache/spark/graphx/Pregel.scala  |  6 +--
 .../apache/spark/graphx/impl/GraphImpl.scala    | 25 ----------
 .../apache/spark/graphx/lib/SVDPlusPlus.scala   | 11 -----
 .../org/apache/spark/graphx/GraphSuite.scala    | 52 +++-----------------
 project/MimaExcludes.scala                      |  6 +++
 7 files changed, 42 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/56d49397/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 869caa3..fe884d0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -341,55 +341,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
   def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
 
   /**
-   * Aggregates values from the neighboring edges and vertices of each vertex.  The user supplied
-   * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
-   * "sent" to either vertex in the edge.  The `reduceFunc` is then used to combine the output of
-   * the map phase destined to each vertex.
-   *
-   * This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead.
-   *
-   * @tparam A the type of "message" to be sent to each vertex
-   *
-   * @param mapFunc the user defined map function which returns 0 or
-   * more messages to neighboring vertices
-   *
-   * @param reduceFunc the user defined reduce function which should
-   * be commutative and associative and is used to combine the output
-   * of the map phase
-   *
-   * @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
-   * desired. This is done by specifying a set of "active" vertices and an edge direction. The
-   * `sendMsg` function will then run only on edges connected to active vertices by edges in the
-   * specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
-   * destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
-   * originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
-   * run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
-   * will be run on edges with *both* vertices in the active set. The active set must have the
-   * same index as the graph's vertices.
-   *
-   * @example We can use this function to compute the in-degree of each
-   * vertex
-   * {{{
-   * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
-   * val inDeg: RDD[(VertexId, Int)] =
-   *   mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
-   * }}}
-   *
-   * @note By expressing computation at the edge level we achieve
-   * maximum parallelism.  This is one of the core functions in the
-   * Graph API in that enables neighborhood level computation. For
-   * example this function can be used to count neighbors satisfying a
-   * predicate or implement PageRank.
-   *
-   */
-  @deprecated("use aggregateMessages", "1.2.0")
-  def mapReduceTriplets[A: ClassTag](
-      mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
-      reduceFunc: (A, A) => A,
-      activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
-    : VertexRDD[A]
-
-  /**
    * Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
    * `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
    * sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages

http://git-wip-us.apache.org/repos/asf/spark/blob/56d49397/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
index 8ec33e1..ef0b943 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.graphx
 
+import scala.reflect.ClassTag
+
 import org.apache.spark.SparkConf
 import org.apache.spark.graphx.impl._
 import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
@@ -24,6 +26,7 @@ import org.apache.spark.util.BoundedPriorityQueue
 import org.apache.spark.util.collection.{BitSet, OpenHashSet}
 
 object GraphXUtils {
+
   /**
    * Registers classes that GraphX uses with Kryo.
    */
@@ -42,4 +45,28 @@ object GraphXUtils {
       classOf[OpenHashSet[Int]],
       classOf[OpenHashSet[Long]]))
   }
+
+  /**
+   * A proxy method to map the obsolete API to the new one.
+   */
+  private[graphx] def mapReduceTriplets[VD: ClassTag, ED: ClassTag, A: ClassTag](
+      g: Graph[VD, ED],
+      mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
+      reduceFunc: (A, A) => A,
+      activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
+    def sendMsg(ctx: EdgeContext[VD, ED, A]) {
+      mapFunc(ctx.toEdgeTriplet).foreach { kv =>
+        val id = kv._1
+        val msg = kv._2
+        if (id == ctx.srcId) {
+          ctx.sendToSrc(msg)
+        } else {
+          assert(id == ctx.dstId)
+          ctx.sendToDst(msg)
+        }
+      }
+    }
+    g.aggregateMessagesWithActiveSet(
+      sendMsg, reduceFunc, TripletFields.All, activeSetOpt)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/56d49397/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 7960827..3ba73b4 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -121,7 +121,7 @@ object Pregel extends Logging {
   {
     var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
     // compute the messages
-    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
+    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
     var activeMessages = messages.count()
     // Loop
     var prevG: Graph[VD, ED] = null
@@ -135,8 +135,8 @@ object Pregel extends Logging {
       // Send new messages, skipping edges where neither side received a message. We must cache
       // messages so it can be materialized on the next line, allowing us to uncache the previous
       // iteration.
-      messages = g.mapReduceTriplets(
-        sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
+      messages = GraphXUtils.mapReduceTriplets(
+        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
       // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
       // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
       // and the vertices of g).

http://git-wip-us.apache.org/repos/asf/spark/blob/56d49397/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 81182ad..c5cb533 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -187,31 +187,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   // Lower level transformation methods
   // ///////////////////////////////////////////////////////////////////////////////////////////////
 
-  override def mapReduceTriplets[A: ClassTag](
-      mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
-      reduceFunc: (A, A) => A,
-      activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A] = {
-
-    def sendMsg(ctx: EdgeContext[VD, ED, A]) {
-      mapFunc(ctx.toEdgeTriplet).foreach { kv =>
-        val id = kv._1
-        val msg = kv._2
-        if (id == ctx.srcId) {
-          ctx.sendToSrc(msg)
-        } else {
-          assert(id == ctx.dstId)
-          ctx.sendToDst(msg)
-        }
-      }
-    }
-
-    val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
-    val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
-    val tripletFields = new TripletFields(mapUsesSrcAttr, mapUsesDstAttr, true)
-
-    aggregateMessagesWithActiveSet(sendMsg, reduceFunc, tripletFields, activeSetOpt)
-  }
-
   override def aggregateMessagesWithActiveSet[A: ClassTag](
       sendMsg: EdgeContext[VD, ED, A] => Unit,
       mergeMsg: (A, A) => A,

http://git-wip-us.apache.org/repos/asf/spark/blob/56d49397/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 16300e0..78a5cb0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -40,17 +40,6 @@ object SVDPlusPlus {
     extends Serializable
 
   /**
-   * This method is now replaced by the updated version of `run()` and returns exactly
-   * the same result.
-   */
-  @deprecated("Call run()", "1.4.0")
-  def runSVDPlusPlus(edges: RDD[Edge[Double]], conf: Conf)
-    : (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) =
-  {
-    run(edges, conf)
-  }
-
-  /**
    * Implement SVD++ based on "Factorization Meets the Neighborhood:
    * a Multifaceted Collaborative Filtering Model",
    * available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].

http://git-wip-us.apache.org/repos/asf/spark/blob/56d49397/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 2fbc6f0..f497e00 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -221,7 +221,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
       val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
       val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
       val graph = Graph(vertices, edges).reverse
-      val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
+      val result = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+        graph, et => Iterator((et.dstId, et.srcAttr)), _ + _)
       assert(result.collect().toSet === Set((1L, 2)))
     }
   }
@@ -281,49 +282,6 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 
-  test("mapReduceTriplets") {
-    withSpark { sc =>
-      val n = 5
-      val star = starGraph(sc, n).mapVertices { (_, _) => 0 }.cache()
-      val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
-      val neighborDegreeSums = starDeg.mapReduceTriplets(
-        edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
-        (a: Int, b: Int) => a + b)
-      assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
-
-      // activeSetOpt
-      val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
-      val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
-      val vids = complete.mapVertices((vid, attr) => vid).cache()
-      val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
-      val numEvenNeighbors = vids.mapReduceTriplets(et => {
-        // Map function should only run on edges with destination in the active set
-        if (et.dstId % 2 != 0) {
-          throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
-        }
-        Iterator((et.srcId, 1))
-      }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
-      assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
-
-      // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
-      val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
-      val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
-      val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
-      val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
-        newOpt.getOrElse(old)
-      }
-      val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
-        // Map function should only run on edges with source in the active set
-        if (et.srcId % 2 != 1) {
-          throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
-        }
-        Iterator((et.dstId, 1))
-      }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
-      assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
-
-    }
-  }
-
   test("aggregateMessages") {
     withSpark { sc =>
       val n = 5
@@ -347,7 +305,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
       val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
         (vid, a, bOpt) => bOpt.getOrElse(0)
       }
-      val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
+      val neighborDegreeSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+        reverseStarDegrees,
         et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
         (a: Int, b: Int) => a + b).collect().toSet
       assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
@@ -420,7 +379,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
       val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
         numEdgePartitions)
       val graph = Graph.fromEdgeTuples(edges, 1)
-      val neighborAttrSums = graph.mapReduceTriplets[Int](
+      val neighborAttrSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+        graph,
         et => Iterator((et.dstId, et.srcAttr)), _ + _)
       assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/56d49397/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 6abab7f..65375a3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -242,6 +242,12 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedPythonFunction"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedFunction"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedFunction$")
+      ) ++ Seq(
+        // SPARK-12995 Remove deprecated APIs in graphx
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.lib.SVDPlusPlus.runSVDPlusPlus"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets$default$3"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.impl.GraphImpl.mapReduceTriplets")
       )
     case v if v.startsWith("1.6") =>
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org