You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/10 23:48:17 UTC

[2/2] git commit: Unify GraphImpl RDDs + other graph load optimizations

Unify GraphImpl RDDs + other graph load optimizations

This PR makes the following changes, primarily in e4fbd329aef85fe2c38b0167255d2a712893d683:

1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices).

2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former.

3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view.

4. *Join elimination for mapTriplets.*

5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.

Author: Ankur Dave <an...@gmail.com>

Closes #497 from ankurdave/unify-rdds and squashes the following commits:

332ab43 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
4933e2e [Ankur Dave] Exclude RoutingTable from binary compatibility check
5ba8789 [Ankur Dave] Add GraphX upgrade guide from Spark 0.9.1
13ac845 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
a04765c [Ankur Dave] Remove unnecessary toOps call
57202e8 [Ankur Dave] Replace case with pair parameter
75af062 [Ankur Dave] Add explicit return types
04d3ae5 [Ankur Dave] Convert implicit parameter to context bound
c88b269 [Ankur Dave] Revert upgradeIterator to if-in-a-loop
0d3584c [Ankur Dave] EdgePartition.size should be val
2a928b2 [Ankur Dave] Set locality wait
10b3596 [Ankur Dave] Clean up public API
ae36110 [Ankur Dave] Fix style errors
e4fbd32 [Ankur Dave] Unify GraphImpl RDDs + other graph load optimizations
d6d60e2 [Ankur Dave] In GraphLoader, coalesce to minEdgePartitions
62c7b78 [Ankur Dave] In Analytics, take PageRank numIter
d64e8d4 [Ankur Dave] Log current Pregel iteration


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/905173df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/905173df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/905173df

Branch: refs/heads/master
Commit: 905173df57b90f90ebafb22e43f55164445330e6
Parents: 6c2691d
Author: Ankur Dave <an...@gmail.com>
Authored: Sat May 10 14:48:07 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sat May 10 14:48:07 2014 -0700

----------------------------------------------------------------------
 docs/graphx-programming-guide.md                |  22 +-
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  56 +--
 .../org/apache/spark/graphx/EdgeTriplet.scala   |   2 +
 .../scala/org/apache/spark/graphx/Graph.scala   |   2 +-
 .../spark/graphx/GraphKryoRegistrator.scala     |   8 +-
 .../org/apache/spark/graphx/GraphLoader.scala   |  10 +-
 .../org/apache/spark/graphx/GraphOps.scala      |  17 +-
 .../scala/org/apache/spark/graphx/Pregel.scala  |   6 +-
 .../org/apache/spark/graphx/VertexRDD.scala     | 166 ++++++---
 .../spark/graphx/impl/EdgePartition.scala       | 132 +++++--
 .../graphx/impl/EdgePartitionBuilder.scala      |  18 +-
 .../spark/graphx/impl/EdgeTripletIterator.scala |  50 ++-
 .../apache/spark/graphx/impl/GraphImpl.scala    | 344 ++++++++-----------
 .../spark/graphx/impl/MessageToPartition.scala  |  21 +-
 .../graphx/impl/ReplicatedVertexView.scala      | 238 ++++---------
 .../apache/spark/graphx/impl/RoutingTable.scala |  82 -----
 .../graphx/impl/RoutingTablePartition.scala     | 158 +++++++++
 .../apache/spark/graphx/impl/Serializers.scala  |  29 ++
 .../graphx/impl/ShippableVertexPartition.scala  | 149 ++++++++
 .../spark/graphx/impl/VertexPartition.scala     | 269 ++-------------
 .../spark/graphx/impl/VertexPartitionBase.scala |  91 +++++
 .../graphx/impl/VertexPartitionBaseOps.scala    | 245 +++++++++++++
 .../org/apache/spark/graphx/lib/Analytics.scala |   8 +-
 .../org/apache/spark/graphx/GraphSuite.scala    |  10 +-
 .../spark/graphx/impl/EdgePartitionSuite.scala  |  48 ++-
 .../graphx/impl/EdgeTripletIteratorSuite.scala  |  10 +-
 .../graphx/impl/VertexPartitionSuite.scala      |  11 -
 project/MimaBuild.scala                         |   2 +
 28 files changed, 1353 insertions(+), 851 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 07be8ba..42ab27b 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -86,6 +86,12 @@ support the [Bagel API](api/scala/index.html#org.apache.spark.bagel.package) and
 [Bagel programming guide](bagel-programming-guide.html). However, we encourage Bagel users to
 explore the new GraphX API and comment on issues that may complicate the transition from Bagel.
 
+## Upgrade Guide from Spark 0.9.1
+
+GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change from Spark 0.9.1. [`EdgeRDD`][EdgeRDD] may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type `Graph[VD, ED]` are of type `EdgeRDD[ED, VD]` rather than `EdgeRDD[ED]`.
+
+[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD
+
 # Getting Started
 
 To get started you first need to import Spark and GraphX into your project, as follows:
@@ -145,12 +151,12 @@ the vertices and edges of the graph:
 {% highlight scala %}
 class Graph[VD, ED] {
   val vertices: VertexRDD[VD]
-  val edges: EdgeRDD[ED]
+  val edges: EdgeRDD[ED, VD]
 }
 {% endhighlight %}
 
-The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
-VD)]` and `RDD[Edge[ED]]` respectively.  Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide  additional
+The classes `VertexRDD[VD]` and `EdgeRDD[ED, VD]` extend and are optimized versions of `RDD[(VertexID,
+VD)]` and `RDD[Edge[ED]]` respectively.  Both `VertexRDD[VD]` and `EdgeRDD[ED, VD]` provide  additional
 functionality built around graph computation and leverage internal optimizations.  We discuss the
 `VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
 RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
@@ -302,7 +308,7 @@ class Graph[VD, ED] {
   val degrees: VertexRDD[Int]
   // Views of the graph as collections =============================================================
   val vertices: VertexRDD[VD]
-  val edges: EdgeRDD[ED]
+  val edges: EdgeRDD[ED, VD]
   val triplets: RDD[EdgeTriplet[VD, ED]]
   // Functions for caching graphs ==================================================================
   def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
@@ -908,7 +914,7 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
 
 ## EdgeRDDs
 
-The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
+The `EdgeRDD[ED, VD]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
 of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy].  Within
 each partition, edge attributes and adjacency structure, are stored separately enabling maximum
 reuse when changing attribute values.
@@ -918,11 +924,11 @@ reuse when changing attribute values.
 The three additional functions exposed by the `EdgeRDD` are:
 {% highlight scala %}
 // Transform the edge attributes while preserving the structure
-def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
+def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
 // Revere the edges reusing both attributes and structure
-def reverse: EdgeRDD[ED]
+def reverse: EdgeRDD[ED, VD]
 // Join two `EdgeRDD`s partitioned using the same partitioning strategy.
-def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
+def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
 {% endhighlight %}
 
 In most applications we have found that operations on the `EdgeRDD` are accomplished through the

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fa78ca9..a8fc095 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -20,16 +20,19 @@ package org.apache.spark.graphx
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
-import org.apache.spark.graphx.impl.EdgePartition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
+import org.apache.spark.graphx.impl.EdgePartition
+
 /**
- * `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
- * for performance.
+ * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
+ * partition for performance. It may additionally store the vertex attributes associated with each
+ * edge to provide the triplet view. Shipping of the vertex attributes is managed by
+ * `impl.ReplicatedVertexView`.
  */
-class EdgeRDD[@specialized ED: ClassTag](
-    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
+class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
+    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
   extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
 
   partitionsRDD.setName("EdgeRDD")
@@ -45,8 +48,12 @@ class EdgeRDD[@specialized ED: ClassTag](
     partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
 
   override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
-    val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
-    p.next._2.iterator.map(_.copy())
+    val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
+    if (p.hasNext) {
+      p.next._2.iterator.map(_.copy())
+    } else {
+      Iterator.empty
+    }
   }
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -61,11 +68,15 @@ class EdgeRDD[@specialized ED: ClassTag](
     this
   }
 
-  private[graphx] def mapEdgePartitions[ED2: ClassTag](
-      f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
-    new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
-      val (pid, ep) = iter.next()
-      Iterator(Tuple2(pid, f(pid, ep)))
+  private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
+    new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+      if (iter.hasNext) {
+        val (pid, ep) = iter.next()
+        Iterator(Tuple2(pid, f(pid, ep)))
+      } else {
+        Iterator.empty
+      }
     }, preservesPartitioning = true))
   }
 
@@ -76,7 +87,7 @@ class EdgeRDD[@specialized ED: ClassTag](
    * @param f the function from an edge to a new edge value
    * @return a new EdgeRDD containing the new edge values
    */
-  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
+  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
     mapEdgePartitions((pid, part) => part.map(f))
 
   /**
@@ -84,7 +95,14 @@ class EdgeRDD[@specialized ED: ClassTag](
    *
    * @return a new EdgeRDD containing all the edges reversed
    */
-  def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
+  def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+
+  /** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
+  def filter(
+      epred: EdgeTriplet[VD, ED] => Boolean,
+      vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
+    mapEdgePartitions((pid, part) => part.filter(epred, vpred))
+  }
 
   /**
    * Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -96,19 +114,15 @@ class EdgeRDD[@specialized ED: ClassTag](
    *         with values supplied by `f`
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
-      (other: EdgeRDD[ED2])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
+      (other: EdgeRDD[ED2, _])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
     val ed2Tag = classTag[ED2]
     val ed3Tag = classTag[ED3]
-    new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+    new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
       (thisIter, otherIter) =>
         val (pid, thisEPart) = thisIter.next()
         val (_, otherEPart) = otherIter.next()
         Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
     })
   }
-
-  private[graphx] def collectVertexIds(): RDD[VertexId] = {
-    partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index dfc6a80..9d473d5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
     if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
 
   override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+
+  def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 5039586..dc5dac4 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * along with their vertex data.
    *
    */
-  @transient val edges: EdgeRDD[ED]
+  @transient val edges: EdgeRDD[ED, VD]
 
   /**
    * An RDD containing the edge triplets, which are edges along with the vertex data associated with

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index dd380d8..d295d01 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -19,10 +19,11 @@ package org.apache.spark.graphx
 
 import com.esotericsoftware.kryo.Kryo
 
-import org.apache.spark.graphx.impl._
 import org.apache.spark.serializer.KryoRegistrator
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx.impl._
 
 /**
  * Registers GraphX classes with Kryo for improved performance.
@@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
     kryo.register(classOf[Edge[Object]])
     kryo.register(classOf[MessageToPartition[Object]])
     kryo.register(classOf[VertexBroadcastMsg[Object]])
+    kryo.register(classOf[RoutingTableMessage])
     kryo.register(classOf[(VertexId, Object)])
-    kryo.register(classOf[EdgePartition[Object]])
+    kryo.register(classOf[EdgePartition[Object, Object]])
     kryo.register(classOf[BitSet])
     kryo.register(classOf[VertexIdToIndexMap])
     kryo.register(classOf[VertexAttributeBlock[Object]])

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 1885846..389490c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -47,8 +47,7 @@ object GraphLoader extends Logging {
    * @param path the path to the file (e.g., /home/data/file or hdfs://file)
    * @param canonicalOrientation whether to orient edges in the positive
    *        direction
-   * @param minEdgePartitions the number of partitions for the
-   *        the edge RDD
+   * @param minEdgePartitions the number of partitions for the edge RDD
    */
   def edgeListFile(
       sc: SparkContext,
@@ -60,8 +59,9 @@ object GraphLoader extends Logging {
     val startTime = System.currentTimeMillis
 
     // Parse the edge data table directly into edge partitions
-    val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[Int]
+    val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
+    val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
+      val builder = new EdgePartitionBuilder[Int, Int]
       iter.foreach { line =>
         if (!line.isEmpty && line(0) != '#') {
           val lineArray = line.split("\\s+")
@@ -78,7 +78,7 @@ object GraphLoader extends Logging {
         }
       }
       Iterator((pid, builder.toEdgePartition))
-    }.cache()
+    }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
     edges.count()
 
     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 4997fbc..edd5b79 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.graphx
 
 import scala.reflect.ClassTag
-import org.apache.spark.SparkContext._
+import scala.util.Random
+
 import org.apache.spark.SparkException
-import org.apache.spark.graphx.lib._
+import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
-import scala.util.Random
+
+import org.apache.spark.graphx.lib._
 
 /**
  * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -43,19 +45,22 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
    * The in-degree of each vertex in the graph.
    * @note Vertices with no in-edges are not returned in the resulting RDD.
    */
-  @transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
+  @transient lazy val inDegrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")
 
   /**
    * The out-degree of each vertex in the graph.
    * @note Vertices with no out-edges are not returned in the resulting RDD.
    */
-  @transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
+  @transient lazy val outDegrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")
 
   /**
    * The degree of each vertex in the graph.
    * @note Vertices with no edges are not returned in the resulting RDD.
    */
-  @transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
+  @transient lazy val degrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")
 
   /**
    * Computes the neighboring vertex degrees.

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index ac07a59..4572eab 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.graphx
 
 import scala.reflect.ClassTag
+import org.apache.spark.Logging
 
 
 /**
@@ -52,7 +53,7 @@ import scala.reflect.ClassTag
  * }}}
  *
  */
-object Pregel {
+object Pregel extends Logging {
 
   /**
    * Execute a Pregel-like iterative vertex-parallel abstraction.  The
@@ -142,6 +143,9 @@ object Pregel {
       // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
       // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
       activeMessages = messages.count()
+
+      logInfo("Pregel finished iteration " + i)
+
       // Unpersist the RDDs hidden by newly-materialized RDDs
       oldMessages.unpersist(blocking=false)
       newVerts.unpersist(blocking=false)

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index f0fc605..8c62897 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -24,8 +24,11 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
 import org.apache.spark.storage.StorageLevel
 
-import org.apache.spark.graphx.impl.MsgRDDFunctions
-import org.apache.spark.graphx.impl.VertexPartition
+import org.apache.spark.graphx.impl.RoutingTablePartition
+import org.apache.spark.graphx.impl.ShippableVertexPartition
+import org.apache.spark.graphx.impl.VertexAttributeBlock
+import org.apache.spark.graphx.impl.RoutingTableMessageRDDFunctions._
+import org.apache.spark.graphx.impl.VertexRDDFunctions._
 
 /**
  * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
@@ -33,6 +36,9 @@ import org.apache.spark.graphx.impl.VertexPartition
  * joined efficiently. All operations except [[reindex]] preserve the index. To construct a
  * `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
  *
+ * Additionally, stores routing information to enable joining the vertex attributes with an
+ * [[EdgeRDD]].
+ *
  * @example Construct a `VertexRDD` from a plain RDD:
  * {{{
  * // Construct an initial vertex set
@@ -50,13 +56,11 @@ import org.apache.spark.graphx.impl.VertexPartition
  * @tparam VD the vertex attribute associated with each vertex in the set.
  */
 class VertexRDD[@specialized VD: ClassTag](
-    val partitionsRDD: RDD[VertexPartition[VD]])
+    val partitionsRDD: RDD[ShippableVertexPartition[VD]])
   extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
 
   require(partitionsRDD.partitioner.isDefined)
 
-  partitionsRDD.setName("VertexRDD")
-
   /**
    * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
    * VertexRDD will be based on a different index and can no longer be quickly joined with this
@@ -71,6 +75,16 @@ class VertexRDD[@specialized VD: ClassTag](
   override protected def getPreferredLocations(s: Partition): Seq[String] =
     partitionsRDD.preferredLocations(s)
 
+  override def setName(_name: String): this.type = {
+    if (partitionsRDD.name != null) {
+      partitionsRDD.setName(partitionsRDD.name + ", " + _name)
+    } else {
+      partitionsRDD.setName(_name)
+    }
+    this
+  }
+  setName("VertexRDD")
+
   override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
@@ -90,14 +104,14 @@ class VertexRDD[@specialized VD: ClassTag](
    * Provides the `RDD[(VertexId, VD)]` equivalent output.
    */
   override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
-    firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
+    firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator
   }
 
   /**
    * Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
    */
   private[graphx] def mapVertexPartitions[VD2: ClassTag](
-    f: VertexPartition[VD] => VertexPartition[VD2])
+      f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
     : VertexRDD[VD2] = {
     val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
     new VertexRDD(newPartitionsRDD)
@@ -208,10 +222,8 @@ class VertexRDD[@specialized VD: ClassTag](
       case _ =>
         new VertexRDD[VD3](
           partitionsRDD.zipPartitions(
-            other.partitionBy(this.partitioner.get), preservesPartitioning = true)
-          { (part, msgs) =>
-            val vertexPartition: VertexPartition[VD] = part.next()
-            Iterator(vertexPartition.leftJoin(msgs)(f))
+            other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
+            (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
           }
         )
     }
@@ -254,10 +266,8 @@ class VertexRDD[@specialized VD: ClassTag](
       case _ =>
         new VertexRDD(
           partitionsRDD.zipPartitions(
-            other.partitionBy(this.partitioner.get), preservesPartitioning = true)
-          { (part, msgs) =>
-            val vertexPartition: VertexPartition[VD] = part.next()
-            Iterator(vertexPartition.innerJoin(msgs)(f))
+            other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
+            (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
           }
         )
     }
@@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag](
    */
   def aggregateUsingIndex[VD2: ClassTag](
       messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
-    val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
+    val shuffled = messages.copartitionWithVertices(this.partitioner.get)
     val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
-      val vertexPartition: VertexPartition[VD] = thisIter.next()
-      Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
+      thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
     }
     new VertexRDD[VD2](parts)
   }
 
+  /**
+   * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
+   * [[EdgeRDD]].
+   */
+  def reverseRoutingTables(): VertexRDD[VD] =
+    this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
+
+  /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
+  private[graphx] def shipVertexAttributes(
+      shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
+    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst)))
+  }
+
+  /** Generates an RDD of vertex IDs suitable for shipping to the edge partitions. */
+  private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = {
+    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
+  }
+
 } // end of VertexRDD
 
 
@@ -293,52 +320,101 @@ class VertexRDD[@specialized VD: ClassTag](
 object VertexRDD {
 
   /**
-   * Construct a `VertexRDD` from an RDD of vertex-attribute pairs.
-   * Duplicate entries are removed arbitrarily.
+   * Constructs a standalone `VertexRDD` (one that is not set up for efficient joins with an
+   * [[EdgeRDD]]) from an RDD of vertex-attribute pairs. Duplicate entries are removed arbitrarily.
    *
    * @tparam VD the vertex attribute type
    *
-   * @param rdd the collection of vertex-attribute pairs
+   * @param vertices the collection of vertex-attribute pairs
    */
-  def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = {
-    val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
-      case Some(p) => rdd
-      case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+  def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+    val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
+      case Some(p) => vertices
+      case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
     }
-    val vertexPartitions = partitioned.mapPartitions(
-      iter => Iterator(VertexPartition(iter)),
+    val vertexPartitions = vPartitioned.mapPartitions(
+      iter => Iterator(ShippableVertexPartition(iter)),
       preservesPartitioning = true)
     new VertexRDD(vertexPartitions)
   }
 
   /**
-   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using
-   * `mergeFunc`.
+   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate vertex entries are
+   * removed arbitrarily. The resulting `VertexRDD` will be joinable with `edges`, and any missing
+   * vertices referred to by `edges` will be created with the attribute `defaultVal`.
    *
    * @tparam VD the vertex attribute type
    *
-   * @param rdd the collection of vertex-attribute pairs
-   * @param mergeFunc the associative, commutative merge function.
+   * @param vertices the collection of vertex-attribute pairs
+   * @param edges the [[EdgeRDD]] that these vertices may be joined with
+   * @param defaultVal the vertex attribute to use when creating missing vertices
    */
-  def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
-    val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
-      case Some(p) => rdd
-      case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+  def apply[VD: ClassTag](
+      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
+    VertexRDD(vertices, edges, defaultVal, (a, b) => b)
+  }
+
+  /**
+   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate vertex entries are
+   * merged using `mergeFunc`. The resulting `VertexRDD` will be joinable with `edges`, and any
+   * missing vertices referred to by `edges` will be created with the attribute `defaultVal`.
+   *
+   * @tparam VD the vertex attribute type
+   *
+   * @param vertices the collection of vertex-attribute pairs
+   * @param edges the [[EdgeRDD]] that these vertices may be joined with
+   * @param defaultVal the vertex attribute to use when creating missing vertices
+   * @param mergeFunc the commutative, associative duplicate vertex attribute merge function
+   */
+  def apply[VD: ClassTag](
+      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
+    ): VertexRDD[VD] = {
+    val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
+      case Some(p) => vertices
+      case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
+    }
+    val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
+    val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
+      (vertexIter, routingTableIter) =>
+        val routingTable =
+          if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+        Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
     }
-    val vertexPartitions = partitioned.mapPartitions(
-      iter => Iterator(VertexPartition(iter)),
-      preservesPartitioning = true)
     new VertexRDD(vertexPartitions)
   }
 
   /**
-   * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
-   * `defaultVal` otherwise.
+   * Constructs a `VertexRDD` containing all vertices referred to in `edges`. The vertices will be
+   * created with the attribute `defaultVal`. The resulting `VertexRDD` will be joinable with
+   * `edges`.
+   *
+   * @tparam VD the vertex attribute type
+   *
+   * @param edges the [[EdgeRDD]] referring to the vertices to create
+   * @param numPartitions the desired number of partitions for the resulting `VertexRDD`
+   * @param defaultVal the vertex attribute to use when creating missing vertices
    */
-  def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD)
-    : VertexRDD[VD] = {
-    VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
-      value.getOrElse(default)
-    }
+  def fromEdges[VD: ClassTag](
+      edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
+    val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
+    val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
+      val routingTable =
+        if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+      Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
+    }, preservesPartitioning = true)
+    new VertexRDD(vertexPartitions)
+  }
+
+  private def createRoutingTables(
+      edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
+    // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
+    val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
+      Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
+      .setName("VertexRDD.createRoutingTables - vid2pid (aggregation)")
+
+    val numEdgePartitions = edges.partitions.size
+    vid2pid.copartitionWithVertices(vertexPartitioner).mapPartitions(
+      iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
+      preservesPartitioning = true)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index b7c472e..871e81f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -17,39 +17,86 @@
 
 package org.apache.spark.graphx.impl
 
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
 
 /**
- * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
- * clustered by src.
+ * A collection of edges stored in columnar format, along with any vertex attributes referenced. The
+ * edges are stored in 3 large columnar arrays (src, dst, attribute). The arrays are clustered by
+ * src. There is an optional active vertex set for filtering computation on the edges.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the vertex attribute type
  *
  * @param srcIds the source vertex id of each edge
  * @param dstIds the destination vertex id of each edge
  * @param data the attribute associated with each edge
  * @param index a clustered index on source vertex id
- * @tparam ED the edge attribute type.
+ * @param vertices a map from referenced vertex ids to their corresponding attributes. Must
+ *   contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for
+ *   those vertex ids. The mask is not used.
+ * @param activeSet an optional active vertex set for filtering computation on the edges
  */
 private[graphx]
-class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
+class EdgePartition[
+    @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
     @transient val srcIds: Array[VertexId],
     @transient val dstIds: Array[VertexId],
     @transient val data: Array[ED],
-    @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
+    @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
+    @transient val vertices: VertexPartition[VD],
+    @transient val activeSet: Option[VertexSet] = None
+  ) extends Serializable {
+
+  /** Return a new `EdgePartition` with the specified edge data. */
+  def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] = {
+    new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
+  }
+
+  /** Return a new `EdgePartition` with the specified vertex partition. */
+  def withVertices[VD2: ClassTag](
+      vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
+    new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
+  }
+
+  /** Return a new `EdgePartition` with the specified active set, provided as an iterator. */
+  def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = {
+    val newActiveSet = new VertexSet
+    iter.foreach(newActiveSet.add(_))
+    new EdgePartition(srcIds, dstIds, data, index, vertices, Some(newActiveSet))
+  }
+
+  /** Return a new `EdgePartition` with the specified active set. */
+  def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD] = {
+    new EdgePartition(srcIds, dstIds, data, index, vertices, activeSet_)
+  }
+
+  /** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
+  def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
+    this.withVertices(vertices.innerJoinKeepLeft(iter))
+  }
+
+  /** Look up vid in activeSet, throwing an exception if it is None. */
+  def isActive(vid: VertexId): Boolean = {
+    activeSet.get.contains(vid)
+  }
+
+  /** The number of active vertices, if any exist. */
+  def numActives: Option[Int] = activeSet.map(_.size)
 
   /**
    * Reverse all the edges in this partition.
    *
    * @return a new edge partition with all edges reversed.
    */
-  def reverse: EdgePartition[ED] = {
-    val builder = new EdgePartitionBuilder(size)
+  def reverse: EdgePartition[ED, VD] = {
+    val builder = new EdgePartitionBuilder(size)(classTag[ED], classTag[VD])
     for (e <- iterator) {
       builder.add(e.dstId, e.srcId, e.attr)
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -64,7 +111,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @return a new edge partition with the result of the function `f`
    *         applied to each edge
    */
-  def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = {
+  def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2, VD] = {
     val newData = new Array[ED2](data.size)
     val edge = new Edge[ED]()
     val size = data.size
@@ -76,7 +123,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       newData(i) = f(edge)
       i += 1
     }
-    new EdgePartition(srcIds, dstIds, newData, index)
+    this.withData(newData)
   }
 
   /**
@@ -91,7 +138,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @tparam ED2 the type of the new attribute
    * @return a new edge partition with the attribute values replaced
    */
-  def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
+  def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2, VD] = {
     // Faster than iter.toArray, because the expected size is known.
     val newData = new Array[ED2](data.size)
     var i = 0
@@ -100,7 +147,23 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       i += 1
     }
     assert(newData.size == i)
-    new EdgePartition(srcIds, dstIds, newData, index)
+    this.withData(newData)
+  }
+
+  /**
+   * Construct a new edge partition containing only the edges matching `epred` and where both
+   * vertices match `vpred`.
+   */
+  def filter(
+      epred: EdgeTriplet[VD, ED] => Boolean,
+      vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
+    val filtered = tripletIterator().filter(et =>
+      vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et))
+    val builder = new EdgePartitionBuilder[ED, VD]
+    for (e <- filtered) {
+      builder.add(e.srcId, e.dstId, e.attr)
+    }
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -119,8 +182,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @param merge a commutative associative merge operation
    * @return a new edge partition without duplicate edges
    */
-  def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
-    val builder = new EdgePartitionBuilder[ED]
+  def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
+    val builder = new EdgePartitionBuilder[ED, VD]
     var currSrcId: VertexId = null.asInstanceOf[VertexId]
     var currDstId: VertexId = null.asInstanceOf[VertexId]
     var currAttr: ED = null.asInstanceOf[ED]
@@ -141,11 +204,11 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
     if (size > 0) {
       builder.add(currSrcId, currDstId, currAttr)
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
-   * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition
+   * Apply `f` to all edges present in both `this` and `other` and return a new `EdgePartition`
    * containing the resulting edges.
    *
    * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for
@@ -155,9 +218,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * once.
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
-      (other: EdgePartition[ED2])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = {
-    val builder = new EdgePartitionBuilder[ED3]
+      (other: EdgePartition[ED2, _])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
+    val builder = new EdgePartitionBuilder[ED3, VD]
     var i = 0
     var j = 0
     // For i = index of each edge in `this`...
@@ -175,7 +238,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       }
       i += 1
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -183,7 +246,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    *
    * @return size of the partition
    */
-  def size: Int = srcIds.size
+  val size: Int = srcIds.size
 
   /** The number of unique source vertices in the partition. */
   def indexSize: Int = index.size
@@ -212,9 +275,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
   }
 
   /**
+   * Get an iterator over the edge triplets in this partition.
+   *
+   * It is safe to keep references to the objects from this iterator.
+   */
+  def tripletIterator(
+      includeSrc: Boolean = true, includeDst: Boolean = true): Iterator[EdgeTriplet[VD, ED]] = {
+    new EdgeTripletIterator(this, includeSrc, includeDst)
+  }
+
+  /**
+   * Upgrade the given edge iterator into a triplet iterator.
+   *
+   * Be careful not to keep references to the objects from this iterator. To improve GC performance
+   * the same object is re-used in `next()`.
+   */
+  def upgradeIterator(
+      edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
+    : Iterator[EdgeTriplet[VD, ED]] = {
+    new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+  }
+
+  /**
    * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
    * iterator is generated using an index scan, so it is efficient at skipping edges that don't
    * match srcIdPred.
+   *
+   * Be careful not to keep references to the objects from this iterator. To improve GC performance
+   * the same object is re-used in `next()`.
    */
   def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
     index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 63ccccb..ecb49be 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -20,12 +20,14 @@ package org.apache.spark.graphx.impl
 import scala.reflect.ClassTag
 import scala.util.Sorting
 
+import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
+
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.PrimitiveVector
 
 private[graphx]
-class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
+class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
+    size: Int = 64) {
   var edges = new PrimitiveVector[Edge[ED]](size)
 
   /** Add a new edge to the partition. */
@@ -33,7 +35,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
     edges += Edge(src, dst, d)
   }
 
-  def toEdgePartition: EdgePartition[ED] = {
+  def toEdgePartition: EdgePartition[ED, VD] = {
     val edgeArray = edges.trim().array
     Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
     val srcIds = new Array[VertexId](edgeArray.size)
@@ -57,6 +59,14 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
         i += 1
       }
     }
-    new EdgePartition(srcIds, dstIds, data, index)
+
+    // Create and populate a VertexPartition with vids from the edges, but no attributes
+    val vidsIter = srcIds.iterator ++ dstIds.iterator
+    val vertexIds = new OpenHashSet[VertexId]
+    vidsIter.foreach(vid => vertexIds.add(vid))
+    val vertices = new VertexPartition(
+      vertexIds, new Array[VD](vertexIds.capacity), vertexIds.getBitSet)
+
+    new EdgePartition(srcIds, dstIds, data, index, vertices)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 220a89d..ebb0b94 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -23,32 +23,62 @@ import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
 
 /**
- * The Iterator type returned when constructing edge triplets. This class technically could be
- * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
- * debug / profile.
+ * The Iterator type returned when constructing edge triplets. This could be an anonymous class in
+ * EdgePartition.tripletIterator, but we name it here explicitly so it is easier to debug / profile.
  */
 private[impl]
 class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
-    val vidToIndex: VertexIdToIndexMap,
-    val vertexArray: Array[VD],
-    val edgePartition: EdgePartition[ED])
+    val edgePartition: EdgePartition[ED, VD],
+    val includeSrc: Boolean,
+    val includeDst: Boolean)
   extends Iterator[EdgeTriplet[VD, ED]] {
 
   // Current position in the array.
   private var pos = 0
 
-  private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
-
   override def hasNext: Boolean = pos < edgePartition.size
 
   override def next() = {
     val triplet = new EdgeTriplet[VD, ED]
     triplet.srcId = edgePartition.srcIds(pos)
-    triplet.srcAttr = vmap(triplet.srcId)
+    if (includeSrc) {
+      triplet.srcAttr = edgePartition.vertices(triplet.srcId)
+    }
     triplet.dstId = edgePartition.dstIds(pos)
-    triplet.dstAttr = vmap(triplet.dstId)
+    if (includeDst) {
+      triplet.dstAttr = edgePartition.vertices(triplet.dstId)
+    }
     triplet.attr = edgePartition.data(pos)
     pos += 1
     triplet
   }
 }
+
+/**
+ * An Iterator type for internal use that reuses EdgeTriplet objects. This could be an anonymous
+ * class in EdgePartition.upgradeIterator, but we name it here explicitly so it is easier to debug /
+ * profile.
+ */
+private[impl]
+class ReusingEdgeTripletIterator[VD: ClassTag, ED: ClassTag](
+    val edgeIter: Iterator[Edge[ED]],
+    val edgePartition: EdgePartition[ED, VD],
+    val includeSrc: Boolean,
+    val includeDst: Boolean)
+  extends Iterator[EdgeTriplet[VD, ED]] {
+
+  private val triplet = new EdgeTriplet[VD, ED]
+
+  override def hasNext = edgeIter.hasNext
+
+  override def next() = {
+    triplet.set(edgeIter.next())
+    if (includeSrc) {
+      triplet.srcAttr = edgePartition.vertices(triplet.srcId)
+    }
+    if (includeDst) {
+      triplet.dstAttr = edgePartition.vertices(triplet.dstId)
+    }
+    triplet
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 9eabccd..2f2d0e0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -19,54 +19,45 @@ package org.apache.spark.graphx.impl
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.util.collection.PrimitiveVector
-import org.apache.spark.{HashPartitioner, Partitioner}
+import org.apache.spark.HashPartitioner
 import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{RDD, ShuffledRDD}
+import org.apache.spark.storage.StorageLevel
+
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.impl.GraphImpl._
 import org.apache.spark.graphx.impl.MsgRDDFunctions._
 import org.apache.spark.graphx.util.BytecodeUtils
-import org.apache.spark.rdd.{ShuffledRDD, RDD}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.ClosureCleaner
 
 
 /**
- * A graph that supports computation on graphs.
+ * An implementation of [[org.apache.spark.graphx.Graph]] to support computation on graphs.
  *
- * Graphs are represented using two classes of data: vertex-partitioned and
- * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges`
- * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods,
- * vertex attributes are replicated to the edge partitions where they appear as sources or
- * destinations. `routingTable` stores the routing information for shipping vertex attributes to
- * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created
- * using the routing table.
+ * Graphs are represented using two RDDs: `vertices`, which contains vertex attributes and the
+ * routing information for shipping vertex attributes to edge partitions, and
+ * `replicatedVertexView`, which contains edges and the vertex attributes mentioned by each edge.
  */
 class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     @transient val vertices: VertexRDD[VD],
-    @transient val edges: EdgeRDD[ED],
-    @transient val routingTable: RoutingTable,
-    @transient val replicatedVertexView: ReplicatedVertexView[VD])
+    @transient val replicatedVertexView: ReplicatedVertexView[VD, ED])
   extends Graph[VD, ED] with Serializable {
 
   /** Default constructor is provided to support serialization */
-  protected def this() = this(null, null, null, null)
+  protected def this() = this(null, null)
+
+  @transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges
 
   /** Return a RDD that brings edges together with their source and destination vertices. */
-  @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
-    val vdTag = classTag[VD]
-    val edTag = classTag[ED]
-    edges.partitionsRDD.zipPartitions(
-      replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
-      val (pid, ePart) = ePartIter.next()
-      val (_, vPart) = vPartIter.next()
-      new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
-    }
+  @transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = {
+    replicatedVertexView.upgrade(vertices, true, true)
+    replicatedVertexView.edges.partitionsRDD.mapPartitions(_.flatMap {
+      case (pid, part) => part.tripletIterator()
+    })
   }
 
   override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
     vertices.persist(newLevel)
-    edges.persist(newLevel)
+    replicatedVertexView.edges.persist(newLevel)
     this
   }
 
@@ -74,14 +65,15 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
     vertices.unpersist(blocking)
-    replicatedVertexView.unpersist(blocking)
+    // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone
     this
   }
 
   override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
-    val numPartitions = edges.partitions.size
+    val numPartitions = replicatedVertexView.edges.partitions.size
     val edTag = classTag[ED]
-    val newEdges = new EdgeRDD(edges.map { e =>
+    val vdTag = classTag[VD]
+    val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e =>
       val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
 
       // Should we be using 3-tuple or an optimized class
@@ -89,105 +81,79 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     }
       .partitionBy(new HashPartitioner(numPartitions))
       .mapPartitionsWithIndex( { (pid, iter) =>
-        val builder = new EdgePartitionBuilder[ED]()(edTag)
+        val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
         iter.foreach { message =>
           val data = message.data
           builder.add(data._1, data._2, data._3)
         }
         val edgePartition = builder.toEdgePartition
         Iterator((pid, edgePartition))
-      }, preservesPartitioning = true).cache())
-    GraphImpl(vertices, newEdges)
+      }, preservesPartitioning = true))
+    GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 
   override def reverse: Graph[VD, ED] = {
-    val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
-    GraphImpl(vertices, newETable)
+    new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
   }
 
   override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
     if (classTag[VD] equals classTag[VD2]) {
+      vertices.cache()
       // The map preserves type, so we can use incremental replication
       val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
       val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
-      val newReplicatedVertexView = new ReplicatedVertexView[VD2](
-        changedVerts, edges, routingTable,
-        Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
-      new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
+        .updateVertices(changedVerts)
+      new GraphImpl(newVerts, newReplicatedVertexView)
     } else {
       // The map does not preserve type, so we must re-replicate all vertices
-      GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
+      GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
     }
   }
 
   override def mapEdges[ED2: ClassTag](
       f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
-    val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
-    new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges
+      .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   override def mapTriplets[ED2: ClassTag](
       f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
-    val newEdgePartitions =
-      edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
-        (ePartIter, vTableReplicatedIter) =>
-        val (ePid, edgePartition) = ePartIter.next()
-        val (vPid, vPart) = vTableReplicatedIter.next()
-        assert(!vTableReplicatedIter.hasNext)
-        assert(ePid == vPid)
-        val et = new EdgeTriplet[VD, ED]
-        val inputIterator = edgePartition.iterator.map { e =>
-          et.set(e)
-          et.srcAttr = vPart(e.srcId)
-          et.dstAttr = vPart(e.dstId)
-          et
-        }
-        // Apply the user function to the vertex partition
-        val outputIter = f(ePid, inputIterator)
-        // Consume the iterator to update the edge attributes
-        val newEdgePartition = edgePartition.map(outputIter)
-        Iterator((ePid, newEdgePartition))
-      }
-    new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
+    vertices.cache()
+    val mapUsesSrcAttr = accessesVertexAttr(f, "srcAttr")
+    val mapUsesDstAttr = accessesVertexAttr(f, "dstAttr")
+    replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+    val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
+      part.map(f(pid, part.tripletIterator(mapUsesSrcAttr, mapUsesDstAttr)))
+    }
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   override def subgraph(
       epred: EdgeTriplet[VD, ED] => Boolean = x => true,
       vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+    vertices.cache()
     // Filter the vertices, reusing the partitioner and the index from this graph
     val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
-
-    // Filter the edges
-    val edTag = classTag[ED]
-    val newEdges = new EdgeRDD[ED](triplets.filter { et =>
-      vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
-    }.mapPartitionsWithIndex( { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[ED]()(edTag)
-      iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) }
-      val edgePartition = builder.toEdgePartition
-      Iterator((pid, edgePartition))
-    }, preservesPartitioning = true)).cache()
-
-    // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
-    // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
-    // an edge.
-    new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView)
-  } // end of subgraph
+    // Filter the triplets. We must always upgrade the triplet view fully because vpred always runs
+    // on both src and dst vertices
+    replicatedVertexView.upgrade(vertices, true, true)
+    val newEdges = replicatedVertexView.edges.filter(epred, vpred)
+    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
+  }
 
   override def mask[VD2: ClassTag, ED2: ClassTag] (
       other: Graph[VD2, ED2]): Graph[VD, ED] = {
     val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
-    val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
-    // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
-    // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
-    // an edge.
-    new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
+    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
   }
 
   override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
-    ClosureCleaner.clean(merge)
-    val newETable = edges.mapEdgePartitions((pid, part) => part.groupEdges(merge))
-    new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges.mapEdgePartitions(
+      (pid, part) => part.groupEdges(merge))
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   // ///////////////////////////////////////////////////////////////////////////////////////////////
@@ -199,68 +165,58 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
       reduceFunc: (A, A) => A,
       activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
 
-    ClosureCleaner.clean(mapFunc)
-    ClosureCleaner.clean(reduceFunc)
+    vertices.cache()
 
     // For each vertex, replicate its attribute only to partitions where it is
     // in the relevant position in an edge.
     val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
     val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
-    val vs = activeSetOpt match {
+    replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+    val view = activeSetOpt match {
       case Some((activeSet, _)) =>
-        replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+        replicatedVertexView.withActiveSet(activeSet)
       case None =>
-        replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr)
+        replicatedVertexView
     }
     val activeDirectionOpt = activeSetOpt.map(_._2)
 
     // Map and combine.
-    val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
-      val (ePid, edgePartition) = ePartIter.next()
-      val (vPid, vPart) = vPartIter.next()
-      assert(!vPartIter.hasNext)
-      assert(ePid == vPid)
-      // Choose scan method
-      val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
-      val edgeIter = activeDirectionOpt match {
-        case Some(EdgeDirection.Both) =>
-          if (activeFraction < 0.8) {
-            edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
-              .filter(e => vPart.isActive(e.dstId))
-          } else {
-            edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
-          }
-        case Some(EdgeDirection.Either) =>
-          // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
-          // the index here. Instead we have to scan all edges and then do the filter.
-          edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
-        case Some(EdgeDirection.Out) =>
-          if (activeFraction < 0.8) {
-            edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
-          } else {
-            edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
-          }
-        case Some(EdgeDirection.In) =>
-          edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
-        case _ => // None
-          edgePartition.iterator
-      }
-
-      // Scan edges and run the map function
-      val et = new EdgeTriplet[VD, ED]
-      val mapOutputs = edgeIter.flatMap { e =>
-        et.set(e)
-        if (mapUsesSrcAttr) {
-          et.srcAttr = vPart(e.srcId)
-        }
-        if (mapUsesDstAttr) {
-          et.dstAttr = vPart(e.dstId)
+    val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
+      case (pid, edgePartition) =>
+        // Choose scan method
+        val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
+        val edgeIter = activeDirectionOpt match {
+          case Some(EdgeDirection.Both) =>
+            if (activeFraction < 0.8) {
+              edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+                .filter(e => edgePartition.isActive(e.dstId))
+            } else {
+              edgePartition.iterator.filter(e =>
+                edgePartition.isActive(e.srcId) && edgePartition.isActive(e.dstId))
+            }
+          case Some(EdgeDirection.Either) =>
+            // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
+            // the index here. Instead we have to scan all edges and then do the filter.
+            edgePartition.iterator.filter(e =>
+              edgePartition.isActive(e.srcId) || edgePartition.isActive(e.dstId))
+          case Some(EdgeDirection.Out) =>
+            if (activeFraction < 0.8) {
+              edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+            } else {
+              edgePartition.iterator.filter(e => edgePartition.isActive(e.srcId))
+            }
+          case Some(EdgeDirection.In) =>
+            edgePartition.iterator.filter(e => edgePartition.isActive(e.dstId))
+          case _ => // None
+            edgePartition.iterator
         }
-        mapFunc(et)
-      }
-      // Note: This doesn't allow users to send messages to arbitrary vertices.
-      vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
-    }
+
+        // Scan edges and run the map function
+        val mapOutputs = edgePartition.upgradeIterator(edgeIter, mapUsesSrcAttr, mapUsesDstAttr)
+          .flatMap(mapFunc(_))
+        // Note: This doesn't allow users to send messages to arbitrary vertices.
+        edgePartition.vertices.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
+    }).setName("GraphImpl.mapReduceTriplets - preAgg")
 
     // do the final reduction reusing the index map
     vertices.aggregateUsingIndex(preAgg, reduceFunc)
@@ -268,20 +224,19 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
   override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
       (other: RDD[(VertexId, U)])
-      (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
-  {
+      (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = {
     if (classTag[VD] equals classTag[VD2]) {
+      vertices.cache()
       // updateF preserves type, so we can use incremental replication
-      val newVerts = vertices.leftJoin(other)(updateF)
+      val newVerts = vertices.leftJoin(other)(updateF).cache()
       val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
-      val newReplicatedVertexView = new ReplicatedVertexView[VD2](
-        changedVerts, edges, routingTable,
-        Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
-      new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
+        .updateVertices(changedVerts)
+      new GraphImpl(newVerts, newReplicatedVertexView)
     } else {
       // updateF does not preserve type, so we must re-replicate all vertices
       val newVerts = vertices.leftJoin(other)(updateF)
-      GraphImpl(newVerts, edges, routingTable)
+      GraphImpl(newVerts, replicatedVertexView.edges)
     }
   }
 
@@ -298,73 +253,68 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
 object GraphImpl {
 
+  /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] =
-  {
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
     fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
   }
 
+  /** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */
   def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
-      edgePartitions: RDD[(PartitionID, EdgePartition[ED])],
+      edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
       defaultVertexAttr: VD): GraphImpl[VD, ED] = {
     fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
   }
 
+  /** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: RDD[(VertexId, VD)],
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] =
-  {
-    val edgeRDD = createEdgeRDD(edges).cache()
-
-    // Get the set of all vids
-    val partitioner = Partitioner.defaultPartitioner(vertices)
-    val vPartitioned = vertices.partitionBy(partitioner)
-    val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner)
-    val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
-      vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
-    }
-
-    val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr)
-
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+    val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache()
+    val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
     GraphImpl(vertexRDD, edgeRDD)
   }
 
+  /** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
-      edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
-    // Cache RDDs that are referenced multiple times
-    edges.cache()
-
-    GraphImpl(vertices, edges, new RoutingTable(edges, vertices))
+      edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
+    // Convert the vertex partitions in edges to the correct type
+    val newEdges = edges.mapEdgePartitions(
+      (pid, part) => part.withVertices(part.vertices.map(
+        (vid, attr) => null.asInstanceOf[VD])))
+    GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 
-  def apply[VD: ClassTag, ED: ClassTag](
+  /**
+   * Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the
+   * vertices.
+   */
+  def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
-      edges: EdgeRDD[ED],
-      routingTable: RoutingTable): GraphImpl[VD, ED] = {
-    // Cache RDDs that are referenced multiple times. `routingTable` is cached by default, so we
-    // don't cache it explicitly.
-    vertices.cache()
-    edges.cache()
-
-    new GraphImpl(
-      vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
+      edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = {
+    new GraphImpl(vertices, new ReplicatedVertexView(edges))
   }
 
   /**
-   * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
-   * data structure (RDD[(VertexId, VertexId, ED)]).
-   *
-   * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
-   * pair: the key is the partition id, and the value is an EdgePartition object containing all the
-   * edges in a partition.
+   * Create a graph from an EdgeRDD with the correct vertex type, setting missing vertices to
+   * `defaultVertexAttr`. The vertices will have the same number of partitions as the EdgeRDD.
    */
-  private def createEdgeRDD[ED: ClassTag](
-      edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
+  private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
+      edges: EdgeRDD[ED, VD],
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+    edges.cache()
+    val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr)
+    fromExistingRDDs(vertices, edges)
+  }
+
+  /** Create an EdgeRDD from a set of edges. */
+  private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
+      edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
     val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[ED]
+      val builder = new EdgePartitionBuilder[ED, VD]
       iter.foreach { e =>
         builder.add(e.srcId, e.dstId, e.attr)
       }
@@ -373,24 +323,4 @@ object GraphImpl {
     new EdgeRDD(edgePartitions)
   }
 
-  private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
-      edges: EdgeRDD[ED],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    edges.cache()
-    // Get the set of all vids
-    val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size))
-    // Create the VertexRDD.
-    val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
-    GraphImpl(vertices, edges)
-  }
-
-  /** Collects all vids mentioned in edges and partitions them by partitioner. */
-  private def collectVertexIdsFromEdges(
-      edges: EdgeRDD[_],
-      partitioner: Partitioner): RDD[(VertexId, Int)] = {
-    // TODO: Consider doing map side distinct before shuffle.
-    new ShuffledRDD[VertexId, Int, (VertexId, Int)](
-      edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
-      .setSerializer(new VertexIdMsgSerializer)
-  }
 } // end of object GraphImpl

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index c45ba3d..1c6d7e5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -89,7 +89,6 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
 
 }
 
-
 private[graphx]
 object MsgRDDFunctions {
   implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = {
@@ -99,18 +98,28 @@ object MsgRDDFunctions {
   implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = {
     new VertexBroadcastMsgRDDFunctions(rdd)
   }
+}
 
-  def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = {
-    val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner)
+private[graphx]
+class VertexRDDFunctions[VD: ClassTag](self: RDD[(VertexId, VD)]) {
+  def copartitionWithVertices(partitioner: Partitioner): RDD[(VertexId, VD)] = {
+    val rdd = new ShuffledRDD[VertexId, VD, (VertexId, VD)](self, partitioner)
 
     // Set a custom serializer if the data is of int or double type.
-    if (classTag[T] == ClassTag.Int) {
+    if (classTag[VD] == ClassTag.Int) {
       rdd.setSerializer(new IntAggMsgSerializer)
-    } else if (classTag[T] == ClassTag.Long) {
+    } else if (classTag[VD] == ClassTag.Long) {
       rdd.setSerializer(new LongAggMsgSerializer)
-    } else if (classTag[T] == ClassTag.Double) {
+    } else if (classTag[VD] == ClassTag.Double) {
       rdd.setSerializer(new DoubleAggMsgSerializer)
     }
     rdd
   }
 }
+
+private[graphx]
+object VertexRDDFunctions {
+  implicit def rdd2VertexRDDFunctions[VD: ClassTag](rdd: RDD[(VertexId, VD)]) = {
+    new VertexRDDFunctions(rdd)
+  }
+}