You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/08 18:55:26 UTC

spark git commit: [SPARK-4917] Add a function to convert into a graph with canonical edges in GraphOps

Repository: spark
Updated Branches:
  refs/heads/master 8d45834de -> f825e193f


[SPARK-4917] Add a function to convert into a graph with canonical edges in GraphOps

Convert bi-directional edges into uni-directional ones instead of 'canonicalOrientation' in GraphLoader.edgeListFile.
This function is useful when a graph is loaded as it is and then is transformed into one with canonical edges.
It rewrites the vertex ids of edges so that srcIds are bigger than dstIds, and merges the duplicated edges.

Author: Takeshi Yamamuro <li...@gmail.com>

Closes #3760 from maropu/ConvertToCanonicalEdgesSpike and squashes the following commits:

7f8b580 [Takeshi Yamamuro] Add a function to convert into a graph with canonical edges in GraphOps


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f825e193
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f825e193
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f825e193

Branch: refs/heads/master
Commit: f825e193f3357e60949bf4c0174675d0d1a40988
Parents: 8d45834
Author: Takeshi Yamamuro <li...@gmail.com>
Authored: Thu Jan 8 09:55:12 2015 -0800
Committer: Ankur Dave <an...@gmail.com>
Committed: Thu Jan 8 09:55:12 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/graphx/GraphOps.scala      | 26 ++++++++++++++++++++
 .../org/apache/spark/graphx/GraphOpsSuite.scala | 15 +++++++++++
 2 files changed, 41 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f825e193/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 116d1ea..dc8b478 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -279,6 +279,32 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
   }
 
   /**
+   * Convert bi-directional edges into uni-directional ones.
+   * Some graph algorithms (e.g., TriangleCount) assume that an input graph
+   * has its edges in canonical direction.
+   * This function rewrites the vertex ids of edges so that srcIds are bigger
+   * than dstIds, and merges the duplicated edges.
+   *
+   * @param mergeFunc the user defined reduce function which should
+   * be commutative and associative and is used to combine the output
+   * of the map phase
+   *
+   * @return the resulting graph with canonical edges
+   */
+  def convertToCanonicalEdges(
+      mergeFunc: (ED, ED) => ED = (e1, e2) => e1): Graph[VD, ED] = {
+    val newEdges =
+      graph.edges
+        .map {
+          case e if e.srcId < e.dstId => ((e.srcId, e.dstId), e.attr)
+          case e => ((e.dstId, e.srcId), e.attr)
+        }
+        .reduceByKey(mergeFunc)
+        .map(e => new Edge(e._1._1, e._1._2, e._2))
+    Graph(graph.vertices, newEdges)
+  }
+
+  /**
    * Execute a Pregel-like iterative vertex-parallel abstraction.  The
    * user-defined vertex-program `vprog` is executed in parallel on
    * each vertex receiving any inbound messages and computing a new

http://git-wip-us.apache.org/repos/asf/spark/blob/f825e193/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index ea94d4a..9bc8007 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -79,6 +79,21 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("convertToCanonicalEdges") {
+    withSpark { sc =>
+      val vertices =
+        sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2)
+      val edges =
+        sc.parallelize(Seq(Edge(1, 2, 1), Edge(2, 1, 1), Edge(3, 2, 2)))
+      val g: Graph[String, Int] = Graph(vertices, edges)
+
+      val g1 = g.convertToCanonicalEdges()
+
+      val e = g1.edges.collect().toSet
+      assert(e === Set(Edge(1, 2, 1), Edge(2, 3, 2)))
+    }
+  }
+
   test("collectEdgesCycleDirectionOut") {
     withSpark { sc =>
       val graph = getCycleGraph(sc, 100)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org