You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:59:45 UTC

[21/50] git commit: Remove aggregateNeighbors

Remove aggregateNeighbors


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1bd5cefc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1bd5cefc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1bd5cefc

Branch: refs/heads/master
Commit: 1bd5cefcae2769d48ad5ef4b8197193371c754da
Parents: e2d25d2
Author: Ankur Dave <an...@gmail.com>
Authored: Mon Jan 13 16:15:10 2014 -0800
Committer: Ankur Dave <an...@gmail.com>
Committed: Mon Jan 13 17:03:03 2014 -0800

----------------------------------------------------------------------
 docs/graphx-programming-guide.md                | 17 ------
 .../org/apache/spark/graphx/GraphOps.scala      | 64 ++------------------
 .../org/apache/spark/graphx/GraphOpsSuite.scala | 26 --------
 3 files changed, 5 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1bd5cefc/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 002ba0c..e6afd09 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -519,23 +519,6 @@ val avgAgeOlderFollowers: VertexRDD[Double] =
 > are constant sized (e.g., floats and addition instead of lists and concatenation).  More
 > precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex.
 
-Because it is often necessary to aggregate information about neighboring vertices we also provide an
-alternative interface defined in [`GraphOps`][GraphOps]:
-
-{% highlight scala %}
-def aggregateNeighbors[A](
-    map: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
-    reduce: (A, A) => A,
-    edgeDir: EdgeDirection)
-  : VertexRDD[A]
-{% endhighlight %}
-
-The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but allows
-the user to define the logic in a more vertex centric manner.  Here the `map` function is provided
-the vertex to which the message is sent as well as one of the edges and returns the optional message
-value.  The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All` edges
-adjacent to each vertex.
-
 ### Computing Degree Information
 
 A common aggregation task is computing the degree of each vertex: the number of edges adjacent to

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1bd5cefc/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index a0a40e2..578eb33 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -56,60 +56,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
   }
 
   /**
-   * Computes a statistic for the neighborhood of each vertex.
-   *
-   * @param mapFunc the function applied to each edge adjacent to each vertex. The mapFunc can
-   * optionally return `None`, in which case it does not contribute to the final sum.
-   * @param reduceFunc the function used to merge the results of each map operation
-   * @param direction the direction of edges to consider (e.g., In, Out, Both).
-   * @tparam A the aggregation type
-   *
-   * @return an RDD containing tuples of vertex identifiers and
-   * their resulting value. Vertices with no neighbors will not appear in the RDD.
-   *
-   * @example We can use this function to compute the average follower
-   * age for each user:
-   *
-   * {{{
-   * val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph")
-   * val averageFollowerAge: RDD[(Int, Int)] =
-   *   graph.aggregateNeighbors[(Int,Double)](
-   *     (vid, edge) => Some((edge.otherVertex(vid).data, 1)),
-   *     (a, b) => (a._1 + b._1, a._2 + b._2),
-   *     -1,
-   *     EdgeDirection.In)
-   *     .mapValues{ case (sum,followers) => sum.toDouble / followers}
-   * }}}
-   */
-  def aggregateNeighbors[A: ClassTag](
-      mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
-      reduceFunc: (A, A) => A,
-      dir: EdgeDirection)
-    : VertexRDD[A] = {
-    // Define a new map function over edge triplets
-    val mf = (et: EdgeTriplet[VD,ED]) => {
-      // Compute the message to the dst vertex
-      val dst =
-        if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
-          mapFunc(et.dstId, et)
-        } else { Option.empty[A] }
-      // Compute the message to the source vertex
-      val src =
-        if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
-          mapFunc(et.srcId, et)
-        } else { Option.empty[A] }
-      // construct the return array
-      (src, dst) match {
-        case (None, None) => Iterator.empty
-        case (Some(srcA),None) => Iterator((et.srcId, srcA))
-        case (None, Some(dstA)) => Iterator((et.dstId, dstA))
-        case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA))
-      }
-    }
-    graph.mapReduceTriplets(mf, reduceFunc)
-  } // end of aggregateNeighbors
-
-  /**
    * Collect the neighbor vertex ids for each vertex.
    *
    * @param edgeDirection the direction along which to collect
@@ -152,11 +98,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
    *
    * @return the vertex set of neighboring vertex attributes for each vertex
    */
-  def collectNeighbors(edgeDirection: EdgeDirection) :
-    VertexRDD[ Array[(VertexID, VD)] ] = {
-    val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]](
-      (vid, edge) =>
-        Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
+  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = {
+    val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]](
+      edge => Iterator(
+        (edge.srcId, Array((edge.dstId, edge.dstAttr))),
+        (edge.dstId, Array((edge.srcId, edge.srcAttr)))),
       (a, b) => a ++ b,
       edgeDirection)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1bd5cefc/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index cd3c0bb..7a90140 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -8,32 +8,6 @@ import org.scalatest.FunSuite
 
 class GraphOpsSuite extends FunSuite with LocalSparkContext {
 
-  test("aggregateNeighbors") {
-    withSpark { sc =>
-      val n = 3
-      val star =
-        Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1)
-
-      val indegrees = star.aggregateNeighbors(
-        (vid, edge) => Some(1),
-        (a: Int, b: Int) => a + b,
-        EdgeDirection.In)
-      assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet)
-
-      val outdegrees = star.aggregateNeighbors(
-        (vid, edge) => Some(1),
-        (a: Int, b: Int) => a + b,
-        EdgeDirection.Out)
-      assert(outdegrees.collect().toSet === Set((0, n)))
-
-      val noVertexValues = star.aggregateNeighbors[Int](
-        (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None,
-        (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
-        EdgeDirection.In)
-      assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)])
-    }
-  }
-
   test("joinVertices") {
     withSpark { sc =>
       val vertices =