You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/06 08:33:17 UTC

git commit: [SPARK-1552] Fix type comparison bug in {map, outerJoin}Vertices

Repository: spark
Updated Branches:
  refs/heads/master 41db44c42 -> 8d85359f8


[SPARK-1552] Fix type comparison bug in {map,outerJoin}Vertices

In GraphImpl, mapVertices and outerJoinVertices use a more efficient implementation when the map function conserves vertex attribute types. This is implemented by comparing the ClassTags of the old and new vertex attribute types. However, ClassTags store erased types, so the comparison will return a false positive for types with different type parameters, such as Option[Int] and Option[Double].

This PR resolves the problem by requesting that the compiler generate evidence of equality between the old and new vertex attribute types, and providing a default value for the evidence parameter if the two types are not equal. The methods can then check the value of the evidence parameter to see whether the types are equal.

It also adds a test called "mapVertices changing type with same erased type" that failed before the PR and succeeds now.

Callers of mapVertices and outerJoinVertices can no longer use a wildcard for a graph's VD type. To avoid "Error occurred in an application involving default arguments," they must bind VD to a type parameter, as this PR does for ShortestPaths and LabelPropagation.

Author: Ankur Dave <an...@gmail.com>

Closes #967 from ankurdave/SPARK-1552 and squashes the following commits:

68a4fff [Ankur Dave] Undo conserve naming
7388705 [Ankur Dave] Remove unnecessary ClassTag for VD parameters
a704e5f [Ankur Dave] Use type equality constraint with default argument
29a5ab7 [Ankur Dave] Add failing test
f458c83 [Ankur Dave] Revert "[SPARK-1552] Fix type comparison bug in mapVertices and outerJoinVertices"
16d6af8 [Ankur Dave] [SPARK-1552] Fix type comparison bug in mapVertices and outerJoinVertices


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d85359f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d85359f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d85359f

Branch: refs/heads/master
Commit: 8d85359f84cc67996b4bcf1670a8a98ab4f914a2
Parents: 41db44c
Author: Ankur Dave <an...@gmail.com>
Authored: Thu Jun 5 23:33:12 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jun 5 23:33:12 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Graph.scala   |  5 ++--
 .../apache/spark/graphx/impl/GraphImpl.scala    | 14 +++++++----
 .../spark/graphx/lib/LabelPropagation.scala     |  2 +-
 .../apache/spark/graphx/lib/ShortestPaths.scala |  2 +-
 .../org/apache/spark/graphx/GraphSuite.scala    | 25 ++++++++++++++++++++
 5 files changed, 40 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d85359f/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 14ae50e..4db45c9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -138,7 +138,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * }}}
    *
    */
-  def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
+  def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2)
+    (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
 
   /**
    * Transforms each edge attribute in the graph using the map function.  The map function is not
@@ -348,7 +349,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * }}}
    */
   def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
-      (mapFunc: (VertexId, VD, Option[U]) => VD2)
+      (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null)
     : Graph[VD2, ED]
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8d85359f/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 15ea05c..ccdaa82 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -104,8 +104,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
   }
 
-  override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
-    if (classTag[VD] equals classTag[VD2]) {
+  override def mapVertices[VD2: ClassTag]
+    (f: (VertexId, VD) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
+    // The implicit parameter eq will be populated by the compiler if VD and VD2 are equal, and left
+    // null if not
+    if (eq != null) {
       vertices.cache()
       // The map preserves type, so we can use incremental replication
       val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
@@ -232,8 +235,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
   override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
       (other: RDD[(VertexId, U)])
-      (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = {
-    if (classTag[VD] equals classTag[VD2]) {
+      (updateF: (VertexId, VD, Option[U]) => VD2)
+      (implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
+    // The implicit parameter eq will be populated by the compiler if VD and VD2 are equal, and left
+    // null if not
+    if (eq != null) {
       vertices.cache()
       // updateF preserves type, so we can use incremental replication
       val newVerts = vertices.leftJoin(other)(updateF).cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/8d85359f/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
index 776bfb8..82e9e06 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
@@ -41,7 +41,7 @@ object LabelPropagation {
    *
    * @return a graph with vertex attributes containing the label of community affiliation
    */
-  def run[ED: ClassTag](graph: Graph[_, ED], maxSteps: Int): Graph[VertexId, ED] = {
+  def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VertexId, ED] = {
     val lpaGraph = graph.mapVertices { case (vid, _) => vid }
     def sendMessage(e: EdgeTriplet[VertexId, ED]) = {
       Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L)))

http://git-wip-us.apache.org/repos/asf/spark/blob/8d85359f/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
index bba070f..590f047 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
@@ -49,7 +49,7 @@ object ShortestPaths {
    * @return a graph where each vertex attribute is a map containing the shortest-path distance to
    * each reachable landmark vertex.
    */
-  def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
+  def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
     val spGraph = graph.mapVertices { (vid, attr) =>
       if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d85359f/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index abc25d0..6506bac 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -159,6 +159,31 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("mapVertices changing type with same erased type") {
+    withSpark { sc =>
+      val vertices = sc.parallelize(Array[(Long, Option[java.lang.Integer])](
+        (1L, Some(1)),
+        (2L, Some(2)),
+        (3L, Some(3))
+      ))
+      val edges = sc.parallelize(Array(
+        Edge(1L, 2L, 0),
+        Edge(2L, 3L, 0),
+        Edge(3L, 1L, 0)
+      ))
+      val graph0 = Graph(vertices, edges)
+      // Trigger initial vertex replication
+      graph0.triplets.foreach(x => {})
+      // Change type of replicated vertices, but preserve erased type
+      val graph1 = graph0.mapVertices {
+        case (vid, integerOpt) => integerOpt.map((x: java.lang.Integer) => (x.toDouble): java.lang.Double)
+      }
+      // Access replicated vertices, exposing the erased type
+      val graph2 = graph1.mapTriplets(t => t.srcAttr.get)
+      assert(graph2.edges.map(_.attr).collect.toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
+    }
+  }
+
   test("mapEdges") {
     withSpark { sc =>
       val n = 3