You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/19 08:37:12 UTC

git commit: [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc

Repository: spark
Updated Branches:
  refs/heads/master e76ef5cb8 -> 3bbbdd818


[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc

VertexRDD.apply had a bug where it ignored the merge function for
duplicate vertices and instead used whichever vertex attribute occurred
first. This commit fixes the bug by passing the merge function through
to ShippableVertexPartition.apply, which merges any duplicates using the
merge function and then fills in missing vertices using the specified
default vertex attribute. This commit also adds a unit test for
VertexRDD.apply.

Author: Larry Xiao <xi...@sjtu.edu.cn>
Author: Blie Arkansol <xi...@sjtu.edu.cn>
Author: Ankur Dave <an...@gmail.com>

Closes #1903 from larryxiao/2062 and squashes the following commits:

625aa9d [Blie Arkansol] Merge pull request #1 from ankurdave/SPARK-2062
476770b [Ankur Dave] ShippableVertexPartition.initFrom: Don't run mergeFunc on default values
614059f [Larry Xiao] doc update: note about the default null value vertices construction
dfdb3c9 [Larry Xiao] minor fix
1c70366 [Larry Xiao] scalastyle check: wrap line, parameter list indent 4 spaces
e4ca697 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
6a35ea8 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
4fbc29c [Blie Arkansol] undo unnecessary change
efae765 [Larry Xiao] fix mistakes: should be able to call with or without mergeFunc
b2422f9 [Larry Xiao] Merge branch '2062' of github.com:larryxiao/spark into 2062
52dc7f7 [Larry Xiao] pass mergeFunc to VertexPartitionBase, where merge is handled
581e9ee [Larry Xiao] TODO: VertexRDDSuite
20d80a3 [Larry Xiao] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bbbdd81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bbbdd81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bbbdd81

Branch: refs/heads/master
Commit: 3bbbdd8180cf316c6f8dde0e879410b6b29f8cc3
Parents: e76ef5c
Author: Larry Xiao <xi...@sjtu.edu.cn>
Authored: Thu Sep 18 23:32:32 2014 -0700
Committer: Ankur Dave <an...@gmail.com>
Committed: Thu Sep 18 23:33:18 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/graphx/VertexRDD.scala     |  4 +--
 .../graphx/impl/ShippableVertexPartition.scala  | 28 ++++++++++++++++----
 .../apache/spark/graphx/VertexRDDSuite.scala    | 11 ++++++++
 3 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3bbbdd81/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 04fbc9d..2c8b245 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -392,7 +392,7 @@ object VertexRDD {
    */
   def apply[VD: ClassTag](
       vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
-    VertexRDD(vertices, edges, defaultVal, (a, b) => b)
+    VertexRDD(vertices, edges, defaultVal, (a, b) => a)
   }
 
   /**
@@ -419,7 +419,7 @@ object VertexRDD {
       (vertexIter, routingTableIter) =>
         val routingTable =
           if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
-        Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
+        Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc))
     }
     new VertexRDD(vertexPartitions)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3bbbdd81/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
index dca54b8..5412d72 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -36,7 +36,7 @@ private[graphx]
 object ShippableVertexPartition {
   /** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
   def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
-    apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
+    apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) => a)
 
   /**
    * Construct a `ShippableVertexPartition` from the given vertices with the specified routing
@@ -44,10 +44,28 @@ object ShippableVertexPartition {
    */
   def apply[VD: ClassTag](
       iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
-    : ShippableVertexPartition[VD] = {
-    val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
-    val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
-    new ShippableVertexPartition(index, values, mask, routingTable)
+    : ShippableVertexPartition[VD] =
+    apply(iter, routingTable, defaultVal, (a, b) => a)
+
+  /**
+   * Construct a `ShippableVertexPartition` from the given vertices with the specified routing
+   * table, filling in missing vertices mentioned in the routing table using `defaultVal`,
+   * and merging duplicate vertex atrribute with mergeFunc.
+   */
+  def apply[VD: ClassTag](
+      iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
+      mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
+    val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
+    // Merge the given vertices using mergeFunc
+    iter.foreach { pair =>
+      map.setMerge(pair._1, pair._2, mergeFunc)
+    }
+    // Fill in missing vertices mentioned in the routing table
+    routingTable.iterator.foreach { vid =>
+      map.changeValue(vid, defaultVal, identity)
+    }
+
+    new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
   }
 
   import scala.language.implicitConversions

http://git-wip-us.apache.org/repos/asf/spark/blob/3bbbdd81/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index cc86baf..42d3f21 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -99,4 +99,15 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("mergeFunc") {
+    // test to see if the mergeFunc is working correctly
+    withSpark { sc =>
+      val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
+      val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+      val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
+      // test merge function
+      assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org