You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/02 09:00:28 UTC

git commit: Add landmark-based Shortest Path algorithm to graphx.lib

Repository: spark
Updated Branches:
  refs/heads/master d17d22148 -> 9535f4045


Add landmark-based Shortest Path algorithm to graphx.lib

This is a modified version of apache/spark#10.

Author: Ankur Dave <an...@gmail.com>
Author: Andres Perez <an...@tresata.com>

Closes #933 from ankurdave/shortestpaths and squashes the following commits:

03a103c [Ankur Dave] Style fixes
7a1ff48 [Ankur Dave] Improve ShortestPaths documentation
d75c8fc [Ankur Dave] Remove unnecessary VD type param, and pass through ED
d983fb4 [Ankur Dave] Fix style errors
60ed8e6 [Andres Perez] Add Shortest-path computations to graphx.lib with unit tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9535f404
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9535f404
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9535f404

Branch: refs/heads/master
Commit: 9535f4045daf46b084761d7f15f63dc6c2a543dd
Parents: d17d221
Author: Ankur Dave <an...@gmail.com>
Authored: Mon Jun 2 00:00:24 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Jun 2 00:00:24 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/graphx/lib/ShortestPaths.scala | 71 ++++++++++++++++++++
 .../spark/graphx/lib/ShortestPathsSuite.scala   | 49 ++++++++++++++
 2 files changed, 120 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9535f404/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
new file mode 100644
index 0000000..bba070f
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.lib
+
+import org.apache.spark.graphx._
+import scala.reflect.ClassTag
+
+/**
+ * Computes shortest paths to the given set of landmark vertices, returning a graph where each
+ * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
+ */
+object ShortestPaths {
+  /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
+  type SPMap = Map[VertexId, Int]
+
+  private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
+
+  private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
+
+  private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
+    (spmap1.keySet ++ spmap2.keySet).map {
+      k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
+    }.toMap
+
+  /**
+   * Computes shortest paths to the given set of landmark vertices.
+   *
+   * @tparam ED the edge attribute type (not used in the computation)
+   *
+   * @param graph the graph for which to compute the shortest paths
+   * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
+   * landmark.
+   *
+   * @return a graph where each vertex attribute is a map containing the shortest-path distance to
+   * each reachable landmark vertex.
+   */
+  def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
+    val spGraph = graph.mapVertices { (vid, attr) =>
+      if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
+    }
+
+    val initialMessage = makeMap()
+
+    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
+      addMaps(attr, msg)
+    }
+
+    def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
+      val newAttr = incrementMap(edge.srcAttr)
+      if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr))
+      else Iterator.empty
+    }
+
+    Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9535f404/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
new file mode 100644
index 0000000..265827b
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.lib
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.lib._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+class ShortestPathsSuite extends FunSuite with LocalSparkContext {
+
+  test("Shortest Path Computations") {
+    withSpark { sc =>
+      val shortestPaths = Set(
+        (1, Map(1 -> 0, 4 -> 2)), (2, Map(1 -> 1, 4 -> 2)), (3, Map(1 -> 2, 4 -> 1)),
+        (4, Map(1 -> 2, 4 -> 0)), (5, Map(1 -> 1, 4 -> 1)), (6, Map(1 -> 3, 4 -> 1)))
+      val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap {
+        case e => Seq(e, e.swap)
+      }
+      val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }
+      val graph = Graph.fromEdgeTuples(edges, 1)
+      val landmarks = Seq(1, 4).map(_.toLong)
+      val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
+        case (v, spMap) => (v, spMap.mapValues(_.get))
+      }
+      assert(results.toSet === shortestPaths)
+    }
+  }
+
+}