You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/10 23:48:16 UTC

[1/2] Unify GraphImpl RDDs + other graph load optimizations

Repository: spark
Updated Branches:
  refs/heads/master 6c2691d0a -> 905173df5


http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index a8154b6..3a0bba1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -21,192 +21,102 @@ import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
 
 import org.apache.spark.graphx._
 
 /**
- * A view of the vertices after they are shipped to the join sites specified in
- * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is
- * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a
- * fresh view is created.
- *
- * The view is always cached (i.e., once it is evaluated, it remains materialized). This avoids
- * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
- * example. However, it means iterative algorithms must manually call `Graph.unpersist` on previous
- * iterations' graphs for best GC performance. See the implementation of
- * [[org.apache.spark.graphx.Pregel]] for an example.
+ * Manages shipping vertex attributes to the edge partitions of an
+ * [[org.apache.spark.graphx.EdgeRDD]]. Vertex attributes may be partially shipped to construct a
+ * triplet view with vertex attributes on only one side, and they may be updated. An active vertex
+ * set may additionally be shipped to the edge partitions. Be careful not to store a reference to
+ * `edges`, since it may be modified when the attribute shipping level is upgraded.
  */
 private[impl]
-class ReplicatedVertexView[VD: ClassTag](
-    updatedVerts: VertexRDD[VD],
-    edges: EdgeRDD[_],
-    routingTable: RoutingTable,
-    prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
+class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
+    var edges: EdgeRDD[ED, VD],
+    var hasSrcId: Boolean = false,
+    var hasDstId: Boolean = false) {
 
   /**
-   * Within each edge partition, create a local map from vid to an index into the attribute
-   * array. Each map contains a superset of the vertices that it will receive, because it stores
-   * vids from both the source and destination of edges. It must always include both source and
-   * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
+   * Return a new `ReplicatedVertexView` with the specified `EdgeRDD`, which must have the same
+   * shipping level.
    */
-  private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
-    case Some(prevView) =>
-      prevView.localVertexIdMap
-    case None =>
-      edges.partitionsRDD.mapPartitions(_.map {
-        case (pid, epart) =>
-          val vidToIndex = new VertexIdToIndexMap
-          epart.foreach { e =>
-            vidToIndex.add(e.srcId)
-            vidToIndex.add(e.dstId)
-          }
-          (pid, vidToIndex)
-      }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap")
-  }
-
-  private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
-  private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false)
-  private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true)
-  private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false)
-
-  def unpersist(blocking: Boolean = true): ReplicatedVertexView[VD] = {
-    bothAttrs.unpersist(blocking)
-    srcAttrOnly.unpersist(blocking)
-    dstAttrOnly.unpersist(blocking)
-    noAttrs.unpersist(blocking)
-    // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it
-    // without modification
-    this
+  def withEdges[VD2: ClassTag, ED2: ClassTag](
+      edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
+    new ReplicatedVertexView(edges_, hasSrcId, hasDstId)
   }
 
-  def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = {
-    (includeSrc, includeDst) match {
-      case (true, true) => bothAttrs
-      case (true, false) => srcAttrOnly
-      case (false, true) => dstAttrOnly
-      case (false, false) => noAttrs
-    }
+  /**
+   * Return a new `ReplicatedVertexView` where edges are reversed and shipping levels are swapped to
+   * match.
+   */
+  def reverse() = {
+    val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
+    new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
   }
 
-  def get(
-      includeSrc: Boolean,
-      includeDst: Boolean,
-      actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = {
-    // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
-    // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
-    // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
-    // also shipped there.
-    val shippedActives = routingTable.get(true, true)
-      .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _))
-      .partitionBy(edges.partitioner.get)
-    // Update the view with shippedActives, setting activeness flags in the resulting
-    // VertexPartitions
-    get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
-      val (pid, vPart) = viewIter.next()
-      val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
-      Iterator((pid, newPart))
+  /**
+   * Upgrade the shipping level in-place to the specified levels by shipping vertex attributes from
+   * `vertices`. This operation modifies the `ReplicatedVertexView`, and callers can access `edges`
+   * afterwards to obtain the upgraded view.
+   */
+  def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
+    val shipSrc = includeSrc && !hasSrcId
+    val shipDst = includeDst && !hasDstId
+    if (shipSrc || shipDst) {
+      val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
+        vertices.shipVertexAttributes(shipSrc, shipDst)
+          .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
+            includeSrc, includeDst, shipSrc, shipDst))
+          .partitionBy(edges.partitioner.get)
+      val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+        (ePartIter, shippedVertsIter) => ePartIter.map {
+          case (pid, edgePartition) =>
+            (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
+        }
+      })
+      edges = newEdges
+      hasSrcId = includeSrc
+      hasDstId = includeDst
     }
   }
 
-  private def create(includeSrc: Boolean, includeDst: Boolean)
-    : RDD[(PartitionID, VertexPartition[VD])] = {
-    val vdTag = classTag[VD]
-
-    // Ship vertex attributes to edge partitions according to vertexPlacement
-    val verts = updatedVerts.partitionsRDD
-    val shippedVerts = routingTable.get(includeSrc, includeDst)
-      .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag))
+  /**
+   * Return a new `ReplicatedVertexView` where the `activeSet` in each edge partition contains only
+   * vertex ids present in `actives`. This ships a vertex id to all edge partitions where it is
+   * referenced, ignoring the attribute shipping level.
+   */
+  def withActiveSet(actives: VertexRDD[_]): ReplicatedVertexView[VD, ED] = {
+    val shippedActives = actives.shipVertexIds()
+      .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)")
       .partitionBy(edges.partitioner.get)
-    // TODO: Consider using a specialized shuffler.
-
-    prevViewOpt match {
-      case Some(prevView) =>
-        // Update prevView with shippedVerts, setting staleness flags in the resulting
-        // VertexPartitions
-        prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
-          (prevViewIter, shippedVertsIter) =>
-            val (pid, prevVPart) = prevViewIter.next()
-            val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
-            Iterator((pid, newVPart))
-        }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))
 
-      case None =>
-        // Within each edge partition, place the shipped vertex attributes into the correct
-        // locations specified in localVertexIdMap
-        localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
-          val (pid, vidToIndex) = mapIter.next()
-          assert(!mapIter.hasNext)
-          // Populate the vertex array using the vidToIndex map
-          val vertexArray = vdTag.newArray(vidToIndex.capacity)
-          for ((_, block) <- shippedVertsIter) {
-            for (i <- 0 until block.vids.size) {
-              val vid = block.vids(i)
-              val attr = block.attrs(i)
-              val ind = vidToIndex.getPos(vid)
-              vertexArray(ind) = attr
-            }
-          }
-          val newVPart = new VertexPartition(
-            vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag)
-          Iterator((pid, newVPart))
-        }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
-    }
-  }
-}
-
-private object ReplicatedVertexView {
-  protected def buildBuffer[VD: ClassTag](
-      pid2vidIter: Iterator[Array[Array[VertexId]]],
-      vertexPartIter: Iterator[VertexPartition[VD]]) = {
-    val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
-    val vertexPart: VertexPartition[VD] = vertexPartIter.next()
-
-    Iterator.tabulate(pid2vid.size) { pid =>
-      val vidsCandidate = pid2vid(pid)
-      val size = vidsCandidate.length
-      val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
-      val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
-      var i = 0
-      while (i < size) {
-        val vid = vidsCandidate(i)
-        if (vertexPart.isDefined(vid)) {
-          vids += vid
-          attrs += vertexPart(vid)
-        }
-        i += 1
+    val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
+      (ePartIter, shippedActivesIter) => ePartIter.map {
+        case (pid, edgePartition) =>
+          (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
       }
-      (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
-    }
+    })
+    new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
   }
 
-  protected def buildActiveBuffer(
-      pid2vidIter: Iterator[Array[Array[VertexId]]],
-      activePartIter: Iterator[VertexPartition[_]])
-    : Iterator[(Int, Array[VertexId])] = {
-    val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
-    val activePart: VertexPartition[_] = activePartIter.next()
+  /**
+   * Return a new `ReplicatedVertexView` where vertex attributes in edge partition are updated using
+   * `updates`. This ships a vertex attribute only to the edge partitions where it is in the
+   * position(s) specified by the attribute shipping level.
+   */
+  def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = {
+    val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId)
+      .setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format(
+        hasSrcId, hasDstId))
+      .partitionBy(edges.partitioner.get)
 
-    Iterator.tabulate(pid2vid.size) { pid =>
-      val vidsCandidate = pid2vid(pid)
-      val size = vidsCandidate.length
-      val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
-      var i = 0
-      while (i < size) {
-        val vid = vidsCandidate(i)
-        if (activePart.isDefined(vid)) {
-          actives += vid
-        }
-        i += 1
+    val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+      (ePartIter, shippedVertsIter) => ePartIter.map {
+        case (pid, edgePartition) =>
+          (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
       }
-      (pid, actives.trim().array)
-    }
+    })
+    new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
   }
 }
-
-private[graphx]
-class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
-  extends Serializable {
-  def iterator: Iterator[(VertexId, VD)] =
-    (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
deleted file mode 100644
index 022d566..0000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.impl
-
-import org.apache.spark.SparkContext._
-import org.apache.spark.graphx._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.collection.PrimitiveVector
-
-/**
- * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing
- * information for shipping vertex attributes to edge partitions. This is always cached because it
- * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and
- * (possibly) once to ship the active-set information.
- */
-private[impl]
-class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
-
-  val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true)
-  val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false)
-  val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true)
-  val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false)
-
-  def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] =
-    (includeSrcAttr, includeDstAttr) match {
-      case (true, true) => bothAttrs
-      case (true, false) => srcAttrOnly
-      case (false, true) => dstAttrOnly
-      case (false, false) => noAttrs
-    }
-
-  private def createPid2Vid(
-      includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = {
-    // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
-    val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
-      val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
-      val numEdges = edgePartition.size
-      val vSet = new VertexSet
-      if (includeSrcAttr) {  // Add src vertices to the set.
-        var i = 0
-        while (i < numEdges) {
-          vSet.add(edgePartition.srcIds(i))
-          i += 1
-        }
-      }
-      if (includeDstAttr) {  // Add dst vertices to the set.
-      var i = 0
-        while (i < numEdges) {
-          vSet.add(edgePartition.dstIds(i))
-          i += 1
-        }
-      }
-      vSet.iterator.map { vid => (vid, pid) }
-    }
-
-    val numEdgePartitions = edges.partitions.size
-    vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
-      val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
-      for ((vid, pid) <- iter) {
-        pid2vid(pid) += vid
-      }
-
-      Iterator(pid2vid.map(_.trim().array))
-    }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
new file mode 100644
index 0000000..927e32a
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+    var vid: VertexId,
+    var pid: PartitionID,
+    var position: Byte)
+  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+  override def _1 = vid
+  override def _2 = (pid, position)
+  override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
+  def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
+    new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
+      .setSerializer(new RoutingTableMessageSerializer)
+  }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+  import scala.language.implicitConversions
+
+  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
+    new RoutingTableMessageRDDFunctions(rdd)
+  }
+}
+
+private[graphx]
+object RoutingTablePartition {
+  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+  /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
+  def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
+    : Iterator[RoutingTableMessage] = {
+    // Determine which positions each vertex id appears in using a map where the low 2 bits
+    // represent src and dst
+    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+    edgePartition.srcIds.iterator.foreach { srcId =>
+      map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
+    }
+    edgePartition.dstIds.iterator.foreach { dstId =>
+      map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
+    }
+    map.iterator.map { vidAndPosition =>
+      new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
+    }
+  }
+
+  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
+  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
+    : RoutingTablePartition = {
+    val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
+    val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
+    val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
+    for (msg <- iter) {
+      pid2vid(msg.pid) += msg.vid
+      srcFlags(msg.pid) += (msg.position & 0x1) != 0
+      dstFlags(msg.pid) += (msg.position & 0x2) != 0
+    }
+
+    new RoutingTablePartition(pid2vid.zipWithIndex.map {
+      case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
+    })
+  }
+
+  /** Compact the given vector of Booleans into a BitSet. */
+  private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = {
+    val bitset = new BitSet(flags.size)
+    var i = 0
+    while (i < flags.size) {
+      if (flags(i)) {
+        bitset.set(i)
+      }
+      i += 1
+    }
+    bitset
+  }
+}
+
+/**
+ * Stores the locations of edge-partition join sites for each vertex attribute in a particular
+ * vertex partition. This provides routing information for shipping vertex attributes to edge
+ * partitions.
+ */
+private[graphx]
+class RoutingTablePartition(
+    private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) {
+  /** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */
+  val numEdgePartitions: Int = routingTable.size
+
+  /** Returns the number of vertices that will be sent to the specified edge partition. */
+  def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.size
+
+  /** Returns an iterator over all vertex ids stored in this `RoutingTablePartition`. */
+  def iterator: Iterator[VertexId] = routingTable.iterator.flatMap(_._1.iterator)
+
+  /** Returns a new RoutingTablePartition reflecting a reversal of all edge directions. */
+  def reverse: RoutingTablePartition = {
+    new RoutingTablePartition(routingTable.map {
+      case (vids, srcVids, dstVids) => (vids, dstVids, srcVids)
+    })
+  }
+
+  /**
+   * Runs `f` on each vertex id to be sent to the specified edge partition. Vertex ids can be
+   * filtered by the position they have in the edge partition.
+   */
+  def foreachWithinEdgePartition
+      (pid: PartitionID, includeSrc: Boolean, includeDst: Boolean)
+      (f: VertexId => Unit) {
+    val (vidsCandidate, srcVids, dstVids) = routingTable(pid)
+    val size = vidsCandidate.length
+    if (includeSrc && includeDst) {
+      // Avoid checks for performance
+      vidsCandidate.iterator.foreach(f)
+    } else if (!includeSrc && !includeDst) {
+      // Do nothing
+    } else {
+      val relevantVids = if (includeSrc) srcVids else dstVids
+      relevantVids.iterator.foreach { i => f(vidsCandidate(i)) }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index 1de42ee..033237f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -28,6 +28,35 @@ import org.apache.spark.graphx._
 import org.apache.spark.serializer._
 
 private[graphx]
+class RoutingTableMessageSerializer extends Serializer with Serializable {
+  override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+    override def serializeStream(s: OutputStream): SerializationStream =
+      new ShuffleSerializationStream(s) {
+        def writeObject[T: ClassTag](t: T): SerializationStream = {
+          val msg = t.asInstanceOf[RoutingTableMessage]
+          writeVarLong(msg.vid, optimizePositive = false)
+          writeUnsignedVarInt(msg.pid)
+          // TODO: Write only the bottom two bits of msg.position
+          s.write(msg.position)
+          this
+        }
+      }
+
+    override def deserializeStream(s: InputStream): DeserializationStream =
+      new ShuffleDeserializationStream(s) {
+        override def readObject[T: ClassTag](): T = {
+          val a = readVarLong(optimizePositive = false)
+          val b = readUnsignedVarInt()
+          val c = s.read()
+          if (c == -1) throw new EOFException
+          new RoutingTableMessage(a, b, c.toByte).asInstanceOf[T]
+        }
+      }
+  }
+}
+
+private[graphx]
 class VertexIdMsgSerializer extends Serializer with Serializable {
   override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
new file mode 100644
index 0000000..f4e221d
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/** Stores vertex attributes to ship to an edge partition. */
+private[graphx]
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
+  extends Serializable {
+  def iterator: Iterator[(VertexId, VD)] =
+    (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
+}
+
+private[graphx]
+object ShippableVertexPartition {
+  /** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
+  def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
+    apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
+
+  /**
+   * Construct a `ShippableVertexPartition` from the given vertices with the specified routing
+   * table, filling in missing vertices mentioned in the routing table using `defaultVal`.
+   */
+  def apply[VD: ClassTag](
+      iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
+    : ShippableVertexPartition[VD] = {
+    val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
+    val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
+    new ShippableVertexPartition(index, values, mask, routingTable)
+  }
+
+  import scala.language.implicitConversions
+
+  /**
+   * Implicit conversion to allow invoking `VertexPartitionBase` operations directly on a
+   * `ShippableVertexPartition`.
+   */
+  implicit def shippablePartitionToOps[VD: ClassTag](partition: ShippableVertexPartition[VD]) =
+    new ShippableVertexPartitionOps(partition)
+
+  /**
+   * Implicit evidence that `ShippableVertexPartition` is a member of the
+   * `VertexPartitionBaseOpsConstructor` typeclass. This enables invoking `VertexPartitionBase`
+   * operations on a `ShippableVertexPartition` via an evidence parameter, as in
+   * [[VertexPartitionBaseOps]].
+   */
+  implicit object ShippableVertexPartitionOpsConstructor
+    extends VertexPartitionBaseOpsConstructor[ShippableVertexPartition] {
+    def toOps[VD: ClassTag](partition: ShippableVertexPartition[VD])
+      : VertexPartitionBaseOps[VD, ShippableVertexPartition] = shippablePartitionToOps(partition)
+  }
+}
+
+/**
+ * A map from vertex id to vertex attribute that additionally stores edge partition join sites for
+ * each vertex attribute, enabling joining with an [[org.apache.spark.graphx.EdgeRDD]].
+ */
+private[graphx]
+class ShippableVertexPartition[VD: ClassTag](
+    val index: VertexIdToIndexMap,
+    val values: Array[VD],
+    val mask: BitSet,
+    val routingTable: RoutingTablePartition)
+  extends VertexPartitionBase[VD] {
+
+  /** Return a new ShippableVertexPartition with the specified routing table. */
+  def withRoutingTable(routingTable_ : RoutingTablePartition): ShippableVertexPartition[VD] = {
+    new ShippableVertexPartition(index, values, mask, routingTable_)
+  }
+
+  /**
+   * Generate a `VertexAttributeBlock` for each edge partition keyed on the edge partition ID. The
+   * `VertexAttributeBlock` contains the vertex attributes from the current partition that are
+   * referenced in the specified positions in the edge partition.
+   */
+  def shipVertexAttributes(
+      shipSrc: Boolean, shipDst: Boolean): Iterator[(PartitionID, VertexAttributeBlock[VD])] = {
+    Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
+      val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64
+      val vids = new PrimitiveVector[VertexId](initialSize)
+      val attrs = new PrimitiveVector[VD](initialSize)
+      var i = 0
+      routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid =>
+        if (isDefined(vid)) {
+          vids += vid
+          attrs += this(vid)
+        }
+        i += 1
+      }
+      (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
+    }
+  }
+
+  /**
+   * Generate a `VertexId` array for each edge partition keyed on the edge partition ID. The array
+   * contains the visible vertex ids from the current partition that are referenced in the edge
+   * partition.
+   */
+  def shipVertexIds(): Iterator[(PartitionID, Array[VertexId])] = {
+    Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
+      val vids = new PrimitiveVector[VertexId](routingTable.partitionSize(pid))
+      var i = 0
+      routingTable.foreachWithinEdgePartition(pid, true, true) { vid =>
+        if (isDefined(vid)) {
+          vids += vid
+        }
+        i += 1
+      }
+      (pid, vids.trim().array)
+    }
+  }
+}
+
+private[graphx] class ShippableVertexPartitionOps[VD: ClassTag](self: ShippableVertexPartition[VD])
+  extends VertexPartitionBaseOps[VD, ShippableVertexPartition](self) {
+
+  def withIndex(index: VertexIdToIndexMap): ShippableVertexPartition[VD] = {
+    new ShippableVertexPartition(index, self.values, self.mask, self.routingTable)
+  }
+
+  def withValues[VD2: ClassTag](values: Array[VD2]): ShippableVertexPartition[VD2] = {
+    new ShippableVertexPartition(self.index, values, self.mask, self.routingTable)
+  }
+
+  def withMask(mask: BitSet): ShippableVertexPartition[VD] = {
+    new ShippableVertexPartition(self.index, self.values, mask, self.routingTable)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index 7a54b41..f1d1747 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -19,260 +19,59 @@ package org.apache.spark.graphx.impl
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.BitSet
 
 private[graphx] object VertexPartition {
-
-  def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = {
-    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
-    iter.foreach { case (k, v) =>
-      map(k) = v
-    }
-    new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
-  }
-
-  def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
-    : VertexPartition[VD] =
-  {
-    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
-    iter.foreach { case (k, v) =>
-      map.setMerge(k, v, mergeFunc)
-    }
-    new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
-  }
-}
-
-
-private[graphx]
-class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
-    val index: VertexIdToIndexMap,
-    val values: Array[VD],
-    val mask: BitSet,
-    /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */
-    private val activeSet: Option[VertexSet] = None)
-  extends Logging {
-
-  val capacity: Int = index.capacity
-
-  def size: Int = mask.cardinality()
-
-  /** Return the vertex attribute for the given vertex ID. */
-  def apply(vid: VertexId): VD = values(index.getPos(vid))
-
-  def isDefined(vid: VertexId): Boolean = {
-    val pos = index.getPos(vid)
-    pos >= 0 && mask.get(pos)
-  }
-
-  /** Look up vid in activeSet, throwing an exception if it is None. */
-  def isActive(vid: VertexId): Boolean = {
-    activeSet.get.contains(vid)
+  /** Construct a `VertexPartition` from the given vertices. */
+  def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)])
+    : VertexPartition[VD] = {
+    val (index, values, mask) = VertexPartitionBase.initFrom(iter)
+    new VertexPartition(index, values, mask)
   }
 
-  /** The number of active vertices, if any exist. */
-  def numActives: Option[Int] = activeSet.map(_.size)
+  import scala.language.implicitConversions
 
   /**
-   * Pass each vertex attribute along with the vertex id through a map
-   * function and retain the original RDD's partitioning and index.
-   *
-   * @tparam VD2 the type returned by the map function
-   *
-   * @param f the function applied to each vertex id and vertex
-   * attribute in the RDD
-   *
-   * @return a new VertexPartition with values obtained by applying `f` to
-   * each of the entries in the original VertexRDD.  The resulting
-   * VertexPartition retains the same index.
+   * Implicit conversion to allow invoking `VertexPartitionBase` operations directly on a
+   * `VertexPartition`.
    */
-  def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = {
-    // Construct a view of the map transformation
-    val newValues = new Array[VD2](capacity)
-    var i = mask.nextSetBit(0)
-    while (i >= 0) {
-      newValues(i) = f(index.getValue(i), values(i))
-      i = mask.nextSetBit(i + 1)
-    }
-    new VertexPartition[VD2](index, newValues, mask)
-  }
-
-  /**
-   * Restrict the vertex set to the set of vertices satisfying the given predicate.
-   *
-   * @param pred the user defined predicate
-   *
-   * @note The vertex set preserves the original index structure which means that the returned
-   *       RDD can be easily joined with the original vertex-set. Furthermore, the filter only
-   *       modifies the bitmap index and so no new values are allocated.
-   */
-  def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = {
-    // Allocate the array to store the results into
-    val newMask = new BitSet(capacity)
-    // Iterate over the active bits in the old mask and evaluate the predicate
-    var i = mask.nextSetBit(0)
-    while (i >= 0) {
-      if (pred(index.getValue(i), values(i))) {
-        newMask.set(i)
-      }
-      i = mask.nextSetBit(i + 1)
-    }
-    new VertexPartition(index, values, newMask)
-  }
+  implicit def partitionToOps[VD: ClassTag](partition: VertexPartition[VD]) =
+    new VertexPartitionOps(partition)
 
   /**
-   * Hides vertices that are the same between this and other. For vertices that are different, keeps
-   * the values from `other`. The indices of `this` and `other` must be the same.
+   * Implicit evidence that `VertexPartition` is a member of the `VertexPartitionBaseOpsConstructor`
+   * typeclass. This enables invoking `VertexPartitionBase` operations on a `VertexPartition` via an
+   * evidence parameter, as in [[VertexPartitionBaseOps]].
    */
-  def diff(other: VertexPartition[VD]): VertexPartition[VD] = {
-    if (index != other.index) {
-      logWarning("Diffing two VertexPartitions with different indexes is slow.")
-      diff(createUsingIndex(other.iterator))
-    } else {
-      val newMask = mask & other.mask
-      var i = newMask.nextSetBit(0)
-      while (i >= 0) {
-        if (values(i) == other.values(i)) {
-          newMask.unset(i)
-        }
-        i = newMask.nextSetBit(i + 1)
-      }
-      new VertexPartition(index, other.values, newMask)
-    }
-  }
-
-  /** Left outer join another VertexPartition. */
-  def leftJoin[VD2: ClassTag, VD3: ClassTag]
-      (other: VertexPartition[VD2])
-      (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
-    if (index != other.index) {
-      logWarning("Joining two VertexPartitions with different indexes is slow.")
-      leftJoin(createUsingIndex(other.iterator))(f)
-    } else {
-      val newValues = new Array[VD3](capacity)
-
-      var i = mask.nextSetBit(0)
-      while (i >= 0) {
-        val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
-        newValues(i) = f(index.getValue(i), values(i), otherV)
-        i = mask.nextSetBit(i + 1)
-      }
-      new VertexPartition(index, newValues, mask)
-    }
-  }
-
-  /** Left outer join another iterator of messages. */
-  def leftJoin[VD2: ClassTag, VD3: ClassTag]
-      (other: Iterator[(VertexId, VD2)])
-      (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
-    leftJoin(createUsingIndex(other))(f)
-  }
-
-  /** Inner join another VertexPartition. */
-  def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
-      (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
-    if (index != other.index) {
-      logWarning("Joining two VertexPartitions with different indexes is slow.")
-      innerJoin(createUsingIndex(other.iterator))(f)
-    } else {
-      val newMask = mask & other.mask
-      val newValues = new Array[VD2](capacity)
-      var i = newMask.nextSetBit(0)
-      while (i >= 0) {
-        newValues(i) = f(index.getValue(i), values(i), other.values(i))
-        i = newMask.nextSetBit(i + 1)
-      }
-      new VertexPartition(index, newValues, newMask)
-    }
-  }
-
-  /**
-   * Inner join an iterator of messages.
-   */
-  def innerJoin[U: ClassTag, VD2: ClassTag]
-      (iter: Iterator[Product2[VertexId, U]])
-      (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
-    innerJoin(createUsingIndex(iter))(f)
+  implicit object VertexPartitionOpsConstructor
+    extends VertexPartitionBaseOpsConstructor[VertexPartition] {
+    def toOps[VD: ClassTag](partition: VertexPartition[VD])
+      : VertexPartitionBaseOps[VD, VertexPartition] = partitionToOps(partition)
   }
+}
 
-  /**
-   * Similar effect as aggregateUsingIndex((a, b) => a)
-   */
-  def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
-    : VertexPartition[VD2] = {
-    val newMask = new BitSet(capacity)
-    val newValues = new Array[VD2](capacity)
-    iter.foreach { case (vid, vdata) =>
-      val pos = index.getPos(vid)
-      if (pos >= 0) {
-        newMask.set(pos)
-        newValues(pos) = vdata
-      }
-    }
-    new VertexPartition[VD2](index, newValues, newMask)
-  }
+/** A map from vertex id to vertex attribute. */
+private[graphx] class VertexPartition[VD: ClassTag](
+    val index: VertexIdToIndexMap,
+    val values: Array[VD],
+    val mask: BitSet)
+  extends VertexPartitionBase[VD]
 
-  /**
-   * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
-   * the partition, hidden by the bitmask.
-   */
-  def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = {
-    val newMask = new BitSet(capacity)
-    val newValues = new Array[VD](capacity)
-    System.arraycopy(values, 0, newValues, 0, newValues.length)
-    iter.foreach { case (vid, vdata) =>
-      val pos = index.getPos(vid)
-      if (pos >= 0) {
-        newMask.set(pos)
-        newValues(pos) = vdata
-      }
-    }
-    new VertexPartition(index, newValues, newMask)
-  }
+private[graphx] class VertexPartitionOps[VD: ClassTag](self: VertexPartition[VD])
+  extends VertexPartitionBaseOps[VD, VertexPartition](self) {
 
-  def aggregateUsingIndex[VD2: ClassTag](
-      iter: Iterator[Product2[VertexId, VD2]],
-      reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
-    val newMask = new BitSet(capacity)
-    val newValues = new Array[VD2](capacity)
-    iter.foreach { product =>
-      val vid = product._1
-      val vdata = product._2
-      val pos = index.getPos(vid)
-      if (pos >= 0) {
-        if (newMask.get(pos)) {
-          newValues(pos) = reduceFunc(newValues(pos), vdata)
-        } else { // otherwise just store the new value
-          newMask.set(pos)
-          newValues(pos) = vdata
-        }
-      }
-    }
-    new VertexPartition[VD2](index, newValues, newMask)
+  def withIndex(index: VertexIdToIndexMap): VertexPartition[VD] = {
+    new VertexPartition(index, self.values, self.mask)
   }
 
-  def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = {
-    val newActiveSet = new VertexSet
-    iter.foreach(newActiveSet.add(_))
-    new VertexPartition(index, values, mask, Some(newActiveSet))
+  def withValues[VD2: ClassTag](values: Array[VD2]): VertexPartition[VD2] = {
+    new VertexPartition(self.index, values, self.mask)
   }
 
-  /**
-   * Construct a new VertexPartition whose index contains only the vertices in the mask.
-   */
-  def reindex(): VertexPartition[VD] = {
-    val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
-    val arbitraryMerge = (a: VD, b: VD) => a
-    for ((k, v) <- this.iterator) {
-      hashMap.setMerge(k, v, arbitraryMerge)
-    }
-    new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
+  def withMask(mask: BitSet): VertexPartition[VD] = {
+    new VertexPartition(self.index, self.values, mask)
   }
-
-  def iterator: Iterator[(VertexId, VD)] =
-    mask.iterator.map(ind => (index.getValue(ind), values(ind)))
-
-  def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
new file mode 100644
index 0000000..8d9e020
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+private[graphx] object VertexPartitionBase {
+  /**
+   * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
+   * entries arbitrarily.
+   */
+  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
+    : (VertexIdToIndexMap, Array[VD], BitSet) = {
+    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    iter.foreach { pair =>
+      map(pair._1) = pair._2
+    }
+    (map.keySet, map._values, map.keySet.getBitSet)
+  }
+
+  /**
+   * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
+   * entries using `mergeFunc`.
+   */
+  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
+    : (VertexIdToIndexMap, Array[VD], BitSet) = {
+    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    iter.foreach { pair =>
+      map.setMerge(pair._1, pair._2, mergeFunc)
+    }
+    (map.keySet, map._values, map.keySet.getBitSet)
+  }
+}
+
+/**
+ * An abstract map from vertex id to vertex attribute. [[VertexPartition]] is the corresponding
+ * concrete implementation. [[VertexPartitionBaseOps]] provides a variety of operations for
+ * VertexPartitionBase and subclasses that provide implicit evidence of membership in the
+ * `VertexPartitionBaseOpsConstructor` typeclass (for example,
+ * [[VertexPartition.VertexPartitionOpsConstructor]]).
+ */
+private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag] {
+
+  def index: VertexIdToIndexMap
+  def values: Array[VD]
+  def mask: BitSet
+
+  val capacity: Int = index.capacity
+
+  def size: Int = mask.cardinality()
+
+  /** Return the vertex attribute for the given vertex ID. */
+  def apply(vid: VertexId): VD = values(index.getPos(vid))
+
+  def isDefined(vid: VertexId): Boolean = {
+    val pos = index.getPos(vid)
+    pos >= 0 && mask.get(pos)
+  }
+
+  def iterator: Iterator[(VertexId, VD)] =
+    mask.iterator.map(ind => (index.getValue(ind), values(ind)))
+}
+
+/**
+ * A typeclass for subclasses of `VertexPartitionBase` representing the ability to wrap them in a
+ * `VertexPartitionBaseOps`.
+ */
+private[graphx] trait VertexPartitionBaseOpsConstructor[T[X] <: VertexPartitionBase[X]] {
+  def toOps[VD: ClassTag](partition: T[VD]): VertexPartitionBaseOps[VD, T]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
new file mode 100644
index 0000000..21ff615
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * An class containing additional operations for subclasses of VertexPartitionBase that provide
+ * implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for
+ * example, [[VertexPartition.VertexPartitionOpsConstructor]]).
+ */
+private[graphx] abstract class VertexPartitionBaseOps
+    [VD: ClassTag, Self[X] <: VertexPartitionBase[X] : VertexPartitionBaseOpsConstructor]
+    (self: Self[VD])
+    extends Logging {
+
+  def withIndex(index: VertexIdToIndexMap): Self[VD]
+  def withValues[VD2: ClassTag](values: Array[VD2]): Self[VD2]
+  def withMask(mask: BitSet): Self[VD]
+
+  /**
+   * Pass each vertex attribute along with the vertex id through a map
+   * function and retain the original RDD's partitioning and index.
+   *
+   * @tparam VD2 the type returned by the map function
+   *
+   * @param f the function applied to each vertex id and vertex
+   * attribute in the RDD
+   *
+   * @return a new VertexPartition with values obtained by applying `f` to
+   * each of the entries in the original VertexRDD.  The resulting
+   * VertexPartition retains the same index.
+   */
+  def map[VD2: ClassTag](f: (VertexId, VD) => VD2): Self[VD2] = {
+    // Construct a view of the map transformation
+    val newValues = new Array[VD2](self.capacity)
+    var i = self.mask.nextSetBit(0)
+    while (i >= 0) {
+      newValues(i) = f(self.index.getValue(i), self.values(i))
+      i = self.mask.nextSetBit(i + 1)
+    }
+    this.withValues(newValues)
+  }
+
+  /**
+   * Restrict the vertex set to the set of vertices satisfying the given predicate.
+   *
+   * @param pred the user defined predicate
+   *
+   * @note The vertex set preserves the original index structure which means that the returned
+   *       RDD can be easily joined with the original vertex-set. Furthermore, the filter only
+   *       modifies the bitmap index and so no new values are allocated.
+   */
+  def filter(pred: (VertexId, VD) => Boolean): Self[VD] = {
+    // Allocate the array to store the results into
+    val newMask = new BitSet(self.capacity)
+    // Iterate over the active bits in the old mask and evaluate the predicate
+    var i = self.mask.nextSetBit(0)
+    while (i >= 0) {
+      if (pred(self.index.getValue(i), self.values(i))) {
+        newMask.set(i)
+      }
+      i = self.mask.nextSetBit(i + 1)
+    }
+    this.withMask(newMask)
+  }
+
+  /**
+   * Hides vertices that are the same between this and other. For vertices that are different, keeps
+   * the values from `other`. The indices of `this` and `other` must be the same.
+   */
+  def diff(other: Self[VD]): Self[VD] = {
+    if (self.index != other.index) {
+      logWarning("Diffing two VertexPartitions with different indexes is slow.")
+      diff(createUsingIndex(other.iterator))
+    } else {
+      val newMask = self.mask & other.mask
+      var i = newMask.nextSetBit(0)
+      while (i >= 0) {
+        if (self.values(i) == other.values(i)) {
+          newMask.unset(i)
+        }
+        i = newMask.nextSetBit(i + 1)
+      }
+      this.withValues(other.values).withMask(newMask)
+    }
+  }
+
+  /** Left outer join another VertexPartition. */
+  def leftJoin[VD2: ClassTag, VD3: ClassTag]
+      (other: Self[VD2])
+      (f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
+    if (self.index != other.index) {
+      logWarning("Joining two VertexPartitions with different indexes is slow.")
+      leftJoin(createUsingIndex(other.iterator))(f)
+    } else {
+      val newValues = new Array[VD3](self.capacity)
+
+      var i = self.mask.nextSetBit(0)
+      while (i >= 0) {
+        val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
+        newValues(i) = f(self.index.getValue(i), self.values(i), otherV)
+        i = self.mask.nextSetBit(i + 1)
+      }
+      this.withValues(newValues)
+    }
+  }
+
+  /** Left outer join another iterator of messages. */
+  def leftJoin[VD2: ClassTag, VD3: ClassTag]
+      (other: Iterator[(VertexId, VD2)])
+      (f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
+    leftJoin(createUsingIndex(other))(f)
+  }
+
+  /** Inner join another VertexPartition. */
+  def innerJoin[U: ClassTag, VD2: ClassTag]
+      (other: Self[U])
+      (f: (VertexId, VD, U) => VD2): Self[VD2] = {
+    if (self.index != other.index) {
+      logWarning("Joining two VertexPartitions with different indexes is slow.")
+      innerJoin(createUsingIndex(other.iterator))(f)
+    } else {
+      val newMask = self.mask & other.mask
+      val newValues = new Array[VD2](self.capacity)
+      var i = newMask.nextSetBit(0)
+      while (i >= 0) {
+        newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i))
+        i = newMask.nextSetBit(i + 1)
+      }
+      this.withValues(newValues).withMask(newMask)
+    }
+  }
+
+  /**
+   * Inner join an iterator of messages.
+   */
+  def innerJoin[U: ClassTag, VD2: ClassTag]
+      (iter: Iterator[Product2[VertexId, U]])
+      (f: (VertexId, VD, U) => VD2): Self[VD2] = {
+    innerJoin(createUsingIndex(iter))(f)
+  }
+
+  /**
+   * Similar effect as aggregateUsingIndex((a, b) => a)
+   */
+  def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
+    : Self[VD2] = {
+    val newMask = new BitSet(self.capacity)
+    val newValues = new Array[VD2](self.capacity)
+    iter.foreach { pair =>
+      val pos = self.index.getPos(pair._1)
+      if (pos >= 0) {
+        newMask.set(pos)
+        newValues(pos) = pair._2
+      }
+    }
+    this.withValues(newValues).withMask(newMask)
+  }
+
+  /**
+   * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
+   * the partition, hidden by the bitmask.
+   */
+  def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): Self[VD] = {
+    val newMask = new BitSet(self.capacity)
+    val newValues = new Array[VD](self.capacity)
+    System.arraycopy(self.values, 0, newValues, 0, newValues.length)
+    iter.foreach { pair =>
+      val pos = self.index.getPos(pair._1)
+      if (pos >= 0) {
+        newMask.set(pos)
+        newValues(pos) = pair._2
+      }
+    }
+    this.withValues(newValues).withMask(newMask)
+  }
+
+  def aggregateUsingIndex[VD2: ClassTag](
+      iter: Iterator[Product2[VertexId, VD2]],
+      reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
+    val newMask = new BitSet(self.capacity)
+    val newValues = new Array[VD2](self.capacity)
+    iter.foreach { product =>
+      val vid = product._1
+      val vdata = product._2
+      val pos = self.index.getPos(vid)
+      if (pos >= 0) {
+        if (newMask.get(pos)) {
+          newValues(pos) = reduceFunc(newValues(pos), vdata)
+        } else { // otherwise just store the new value
+          newMask.set(pos)
+          newValues(pos) = vdata
+        }
+      }
+    }
+    this.withValues(newValues).withMask(newMask)
+  }
+
+  /**
+   * Construct a new VertexPartition whose index contains only the vertices in the mask.
+   */
+  def reindex(): Self[VD] = {
+    val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    val arbitraryMerge = (a: VD, b: VD) => a
+    for ((k, v) <- self.iterator) {
+      hashMap.setMerge(k, v, arbitraryMerge)
+    }
+    this.withIndex(hashMap.keySet).withValues(hashMap._values).withMask(hashMap.keySet.getBitSet)
+  }
+
+  /**
+   * Converts a vertex partition (in particular, one of type `Self`) into a
+   * `VertexPartitionBaseOps`. Within this class, this allows chaining the methods defined above,
+   * because these methods return a `Self` and this implicit conversion re-wraps that in a
+   * `VertexPartitionBaseOps`. This relies on the context bound on `Self`.
+   */
+  private implicit def toOps[VD2: ClassTag](
+      partition: Self[VD2]): VertexPartitionBaseOps[VD2, Self] = {
+    implicitly[VertexPartitionBaseOpsConstructor[Self]].toOps(partition)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index d901d4f..069e042 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -55,6 +55,7 @@ object Analytics extends Logging {
     val conf = new SparkConf()
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+      .set("spark.locality.wait", "100000")
 
     taskType match {
       case "pagerank" =>
@@ -62,12 +63,14 @@ object Analytics extends Logging {
         var outFname = ""
         var numEPart = 4
         var partitionStrategy: Option[PartitionStrategy] = None
+        var numIterOpt: Option[Int] = None
 
         options.foreach{
           case ("tol", v) => tol = v.toFloat
           case ("output", v) => outFname = v
           case ("numEPart", v) => numEPart = v.toInt
           case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+          case ("numIter", v) => numIterOpt = Some(v.toInt)
           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
         }
 
@@ -84,7 +87,10 @@ object Analytics extends Logging {
         println("GRAPHX: Number of vertices " + graph.vertices.count)
         println("GRAPHX: Number of edges " + graph.edges.count)
 
-        val pr = graph.pageRank(tol).vertices.cache()
+        val pr = (numIterOpt match {
+          case Some(numIter) => PageRank.run(graph, numIter)
+          case None => PageRank.runUntilConvergence(graph, tol)
+        }).vertices.cache()
 
         println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 32b5fe4..7b9bac5 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val p = 100
       val verts = 1 to n
       val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
-        verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
+        verts.withFilter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
       assert(graph.edges.partitions.length === p)
       val partitionedGraph = graph.partitionBy(EdgePartition2D)
       assert(graph.edges.partitions.length === p)
@@ -120,7 +120,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
         val part = iter.next()._2
         Iterator((part.srcIds ++ part.dstIds).toSet)
       }.collect
-      assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
+      if (!verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) {
+        val numFailures = verts.count(id => partitionSets.count(_.contains(id)) > bound)
+        val failure = verts.maxBy(id => partitionSets.count(_.contains(id)))
+        fail(("Replication bound test failed for %d/%d vertices. " +
+          "Example: vertex %d replicated to %d (> %f) partitions.").format(
+          numFailures, n, failure, partitionSets.count(_.contains(failure)), bound))
+      }
       // This should not be true for the default hash partitioning
       val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
         val part = iter.next()._2

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index e135d1d..d2e0c01 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -26,10 +26,16 @@ import org.apache.spark.graphx._
 
 class EdgePartitionSuite extends FunSuite {
 
+  def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A, Int] = {
+    val builder = new EdgePartitionBuilder[A, Int]
+    for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
+    builder.toEdgePartition
+  }
+
   test("reverse") {
     val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
     val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0))
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Nothing]
     for (e <- edges) {
       builder.add(e.srcId, e.dstId, e.attr)
     }
@@ -40,7 +46,7 @@ class EdgePartitionSuite extends FunSuite {
 
   test("map") {
     val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Nothing]
     for (e <- edges) {
       builder.add(e.srcId, e.dstId, e.attr)
     }
@@ -49,11 +55,22 @@ class EdgePartitionSuite extends FunSuite {
       edges.map(e => e.copy(attr = e.srcId + e.dstId)))
   }
 
+  test("filter") {
+    val edges = List(Edge(0, 1, 0), Edge(0, 2, 0), Edge(2, 0, 0))
+    val builder = new EdgePartitionBuilder[Int, Int]
+    for (e <- edges) {
+      builder.add(e.srcId, e.dstId, e.attr)
+    }
+    val edgePartition = builder.toEdgePartition
+    val filtered = edgePartition.filter(et => et.srcId == 0, (vid, attr) => vid == 0 || vid == 1)
+    assert(filtered.tripletIterator().toList.map(et => (et.srcId, et.dstId)) === List((0L, 1L)))
+  }
+
   test("groupEdges") {
     val edges = List(
       Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32))
     val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36))
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Nothing]
     for (e <- edges) {
       builder.add(e.srcId, e.dstId, e.attr)
     }
@@ -61,11 +78,19 @@ class EdgePartitionSuite extends FunSuite {
     assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
   }
 
+  test("upgradeIterator") {
+    val edges = List((0, 1, 0), (1, 0, 0))
+    val verts = List((0L, 1), (1L, 2))
+    val part = makeEdgePartition(edges).updateVertices(verts.iterator)
+    assert(part.upgradeIterator(part.iterator).map(_.toTuple).toList ===
+      part.tripletIterator().toList.map(_.toTuple))
+  }
+
   test("indexIterator") {
     val edgesFrom0 = List(Edge(0, 1, 0))
     val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
     val sortedEdges = edgesFrom0 ++ edgesFrom1
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Nothing]
     for (e <- Random.shuffle(sortedEdges)) {
       builder.add(e.srcId, e.dstId, e.attr)
     }
@@ -77,11 +102,6 @@ class EdgePartitionSuite extends FunSuite {
   }
 
   test("innerJoin") {
-    def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
-      val builder = new EdgePartitionBuilder[A]
-      for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
-      builder.toEdgePartition
-    }
     val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
     val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
     val a = makeEdgePartition(aList)
@@ -90,4 +110,14 @@ class EdgePartitionSuite extends FunSuite {
     assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
       List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
   }
+
+  test("isActive, numActives, replaceActives") {
+    val ep = new EdgePartitionBuilder[Nothing, Nothing].toEdgePartition
+      .withActiveSet(Iterator(0L, 2L, 0L))
+    assert(ep.isActive(0))
+    assert(!ep.isActive(1))
+    assert(ep.isActive(2))
+    assert(!ep.isActive(-1))
+    assert(ep.numActives == Some(2))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
index 9cbb2d2..49b2704 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
@@ -26,17 +26,11 @@ import org.apache.spark.graphx._
 
 class EdgeTripletIteratorSuite extends FunSuite {
   test("iterator.toList") {
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Int]
     builder.add(1, 2, 0)
     builder.add(1, 3, 0)
     builder.add(1, 4, 0)
-    val vidmap = new VertexIdToIndexMap
-    vidmap.add(1)
-    vidmap.add(2)
-    vidmap.add(3)
-    vidmap.add(4)
-    val vs = Array.fill(vidmap.capacity)(0)
-    val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition)
+    val iter = new EdgeTripletIterator[Int, Int](builder.toEdgePartition, true, true)
     val result = iter.toList.map(et => (et.srcId, et.dstId))
     assert(result === Seq((1, 2), (1, 3), (1, 4)))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
index a048d13..8bf1384 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -30,17 +30,6 @@ class VertexPartitionSuite extends FunSuite {
     assert(!vp.isDefined(-1))
   }
 
-  test("isActive, numActives, replaceActives") {
-    val vp = VertexPartition(Iterator((0L, 1), (1L, 1)))
-      .filter { (vid, attr) => vid == 0 }
-      .replaceActives(Iterator(0, 2, 0))
-    assert(vp.isActive(0))
-    assert(!vp.isActive(1))
-    assert(vp.isActive(2))
-    assert(!vp.isActive(-1))
-    assert(vp.numActives == Some(2))
-  }
-
   test("map") {
     val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).map { (vid, attr) => 2 }
     assert(vp(0) === 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/project/MimaBuild.scala
----------------------------------------------------------------------
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index efdb38e..fafc9b3 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -76,6 +76,8 @@ object MimaBuild {
           excludeSparkClass("util.XORShiftRandom") ++
           excludeSparkClass("graphx.EdgeRDD") ++
           excludeSparkClass("graphx.VertexRDD") ++
+          excludeSparkClass("graphx.impl.GraphImpl") ++
+          excludeSparkClass("graphx.impl.RoutingTable") ++
           excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
           excludeSparkClass("mllib.optimization.SquaredGradient") ++
           excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++


[2/2] git commit: Unify GraphImpl RDDs + other graph load optimizations

Posted by pw...@apache.org.
Unify GraphImpl RDDs + other graph load optimizations

This PR makes the following changes, primarily in e4fbd329aef85fe2c38b0167255d2a712893d683:

1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices).

2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former.

3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view.

4. *Join elimination for mapTriplets.*

5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.

Author: Ankur Dave <an...@gmail.com>

Closes #497 from ankurdave/unify-rdds and squashes the following commits:

332ab43 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
4933e2e [Ankur Dave] Exclude RoutingTable from binary compatibility check
5ba8789 [Ankur Dave] Add GraphX upgrade guide from Spark 0.9.1
13ac845 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
a04765c [Ankur Dave] Remove unnecessary toOps call
57202e8 [Ankur Dave] Replace case with pair parameter
75af062 [Ankur Dave] Add explicit return types
04d3ae5 [Ankur Dave] Convert implicit parameter to context bound
c88b269 [Ankur Dave] Revert upgradeIterator to if-in-a-loop
0d3584c [Ankur Dave] EdgePartition.size should be val
2a928b2 [Ankur Dave] Set locality wait
10b3596 [Ankur Dave] Clean up public API
ae36110 [Ankur Dave] Fix style errors
e4fbd32 [Ankur Dave] Unify GraphImpl RDDs + other graph load optimizations
d6d60e2 [Ankur Dave] In GraphLoader, coalesce to minEdgePartitions
62c7b78 [Ankur Dave] In Analytics, take PageRank numIter
d64e8d4 [Ankur Dave] Log current Pregel iteration


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/905173df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/905173df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/905173df

Branch: refs/heads/master
Commit: 905173df57b90f90ebafb22e43f55164445330e6
Parents: 6c2691d
Author: Ankur Dave <an...@gmail.com>
Authored: Sat May 10 14:48:07 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sat May 10 14:48:07 2014 -0700

----------------------------------------------------------------------
 docs/graphx-programming-guide.md                |  22 +-
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  56 +--
 .../org/apache/spark/graphx/EdgeTriplet.scala   |   2 +
 .../scala/org/apache/spark/graphx/Graph.scala   |   2 +-
 .../spark/graphx/GraphKryoRegistrator.scala     |   8 +-
 .../org/apache/spark/graphx/GraphLoader.scala   |  10 +-
 .../org/apache/spark/graphx/GraphOps.scala      |  17 +-
 .../scala/org/apache/spark/graphx/Pregel.scala  |   6 +-
 .../org/apache/spark/graphx/VertexRDD.scala     | 166 ++++++---
 .../spark/graphx/impl/EdgePartition.scala       | 132 +++++--
 .../graphx/impl/EdgePartitionBuilder.scala      |  18 +-
 .../spark/graphx/impl/EdgeTripletIterator.scala |  50 ++-
 .../apache/spark/graphx/impl/GraphImpl.scala    | 344 ++++++++-----------
 .../spark/graphx/impl/MessageToPartition.scala  |  21 +-
 .../graphx/impl/ReplicatedVertexView.scala      | 238 ++++---------
 .../apache/spark/graphx/impl/RoutingTable.scala |  82 -----
 .../graphx/impl/RoutingTablePartition.scala     | 158 +++++++++
 .../apache/spark/graphx/impl/Serializers.scala  |  29 ++
 .../graphx/impl/ShippableVertexPartition.scala  | 149 ++++++++
 .../spark/graphx/impl/VertexPartition.scala     | 269 ++-------------
 .../spark/graphx/impl/VertexPartitionBase.scala |  91 +++++
 .../graphx/impl/VertexPartitionBaseOps.scala    | 245 +++++++++++++
 .../org/apache/spark/graphx/lib/Analytics.scala |   8 +-
 .../org/apache/spark/graphx/GraphSuite.scala    |  10 +-
 .../spark/graphx/impl/EdgePartitionSuite.scala  |  48 ++-
 .../graphx/impl/EdgeTripletIteratorSuite.scala  |  10 +-
 .../graphx/impl/VertexPartitionSuite.scala      |  11 -
 project/MimaBuild.scala                         |   2 +
 28 files changed, 1353 insertions(+), 851 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 07be8ba..42ab27b 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -86,6 +86,12 @@ support the [Bagel API](api/scala/index.html#org.apache.spark.bagel.package) and
 [Bagel programming guide](bagel-programming-guide.html). However, we encourage Bagel users to
 explore the new GraphX API and comment on issues that may complicate the transition from Bagel.
 
+## Upgrade Guide from Spark 0.9.1
+
+GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change from Spark 0.9.1. [`EdgeRDD`][EdgeRDD] may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type `Graph[VD, ED]` are of type `EdgeRDD[ED, VD]` rather than `EdgeRDD[ED]`.
+
+[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD
+
 # Getting Started
 
 To get started you first need to import Spark and GraphX into your project, as follows:
@@ -145,12 +151,12 @@ the vertices and edges of the graph:
 {% highlight scala %}
 class Graph[VD, ED] {
   val vertices: VertexRDD[VD]
-  val edges: EdgeRDD[ED]
+  val edges: EdgeRDD[ED, VD]
 }
 {% endhighlight %}
 
-The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
-VD)]` and `RDD[Edge[ED]]` respectively.  Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide  additional
+The classes `VertexRDD[VD]` and `EdgeRDD[ED, VD]` extend and are optimized versions of `RDD[(VertexID,
+VD)]` and `RDD[Edge[ED]]` respectively.  Both `VertexRDD[VD]` and `EdgeRDD[ED, VD]` provide  additional
 functionality built around graph computation and leverage internal optimizations.  We discuss the
 `VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
 RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
@@ -302,7 +308,7 @@ class Graph[VD, ED] {
   val degrees: VertexRDD[Int]
   // Views of the graph as collections =============================================================
   val vertices: VertexRDD[VD]
-  val edges: EdgeRDD[ED]
+  val edges: EdgeRDD[ED, VD]
   val triplets: RDD[EdgeTriplet[VD, ED]]
   // Functions for caching graphs ==================================================================
   def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
@@ -908,7 +914,7 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
 
 ## EdgeRDDs
 
-The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
+The `EdgeRDD[ED, VD]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
 of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy].  Within
 each partition, edge attributes and adjacency structure, are stored separately enabling maximum
 reuse when changing attribute values.
@@ -918,11 +924,11 @@ reuse when changing attribute values.
 The three additional functions exposed by the `EdgeRDD` are:
 {% highlight scala %}
 // Transform the edge attributes while preserving the structure
-def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
+def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
 // Revere the edges reusing both attributes and structure
-def reverse: EdgeRDD[ED]
+def reverse: EdgeRDD[ED, VD]
 // Join two `EdgeRDD`s partitioned using the same partitioning strategy.
-def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
+def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
 {% endhighlight %}
 
 In most applications we have found that operations on the `EdgeRDD` are accomplished through the

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fa78ca9..a8fc095 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -20,16 +20,19 @@ package org.apache.spark.graphx
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
-import org.apache.spark.graphx.impl.EdgePartition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
+import org.apache.spark.graphx.impl.EdgePartition
+
 /**
- * `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
- * for performance.
+ * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
+ * partition for performance. It may additionally store the vertex attributes associated with each
+ * edge to provide the triplet view. Shipping of the vertex attributes is managed by
+ * `impl.ReplicatedVertexView`.
  */
-class EdgeRDD[@specialized ED: ClassTag](
-    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
+class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
+    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
   extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
 
   partitionsRDD.setName("EdgeRDD")
@@ -45,8 +48,12 @@ class EdgeRDD[@specialized ED: ClassTag](
     partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
 
   override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
-    val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
-    p.next._2.iterator.map(_.copy())
+    val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
+    if (p.hasNext) {
+      p.next._2.iterator.map(_.copy())
+    } else {
+      Iterator.empty
+    }
   }
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -61,11 +68,15 @@ class EdgeRDD[@specialized ED: ClassTag](
     this
   }
 
-  private[graphx] def mapEdgePartitions[ED2: ClassTag](
-      f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
-    new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
-      val (pid, ep) = iter.next()
-      Iterator(Tuple2(pid, f(pid, ep)))
+  private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
+    new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+      if (iter.hasNext) {
+        val (pid, ep) = iter.next()
+        Iterator(Tuple2(pid, f(pid, ep)))
+      } else {
+        Iterator.empty
+      }
     }, preservesPartitioning = true))
   }
 
@@ -76,7 +87,7 @@ class EdgeRDD[@specialized ED: ClassTag](
    * @param f the function from an edge to a new edge value
    * @return a new EdgeRDD containing the new edge values
    */
-  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
+  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
     mapEdgePartitions((pid, part) => part.map(f))
 
   /**
@@ -84,7 +95,14 @@ class EdgeRDD[@specialized ED: ClassTag](
    *
    * @return a new EdgeRDD containing all the edges reversed
    */
-  def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
+  def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+
+  /** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
+  def filter(
+      epred: EdgeTriplet[VD, ED] => Boolean,
+      vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
+    mapEdgePartitions((pid, part) => part.filter(epred, vpred))
+  }
 
   /**
    * Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -96,19 +114,15 @@ class EdgeRDD[@specialized ED: ClassTag](
    *         with values supplied by `f`
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
-      (other: EdgeRDD[ED2])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
+      (other: EdgeRDD[ED2, _])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
     val ed2Tag = classTag[ED2]
     val ed3Tag = classTag[ED3]
-    new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+    new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
       (thisIter, otherIter) =>
         val (pid, thisEPart) = thisIter.next()
         val (_, otherEPart) = otherIter.next()
         Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
     })
   }
-
-  private[graphx] def collectVertexIds(): RDD[VertexId] = {
-    partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index dfc6a80..9d473d5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
     if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
 
   override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+
+  def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 5039586..dc5dac4 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * along with their vertex data.
    *
    */
-  @transient val edges: EdgeRDD[ED]
+  @transient val edges: EdgeRDD[ED, VD]
 
   /**
    * An RDD containing the edge triplets, which are edges along with the vertex data associated with

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index dd380d8..d295d01 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -19,10 +19,11 @@ package org.apache.spark.graphx
 
 import com.esotericsoftware.kryo.Kryo
 
-import org.apache.spark.graphx.impl._
 import org.apache.spark.serializer.KryoRegistrator
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx.impl._
 
 /**
  * Registers GraphX classes with Kryo for improved performance.
@@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
     kryo.register(classOf[Edge[Object]])
     kryo.register(classOf[MessageToPartition[Object]])
     kryo.register(classOf[VertexBroadcastMsg[Object]])
+    kryo.register(classOf[RoutingTableMessage])
     kryo.register(classOf[(VertexId, Object)])
-    kryo.register(classOf[EdgePartition[Object]])
+    kryo.register(classOf[EdgePartition[Object, Object]])
     kryo.register(classOf[BitSet])
     kryo.register(classOf[VertexIdToIndexMap])
     kryo.register(classOf[VertexAttributeBlock[Object]])

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 1885846..389490c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -47,8 +47,7 @@ object GraphLoader extends Logging {
    * @param path the path to the file (e.g., /home/data/file or hdfs://file)
    * @param canonicalOrientation whether to orient edges in the positive
    *        direction
-   * @param minEdgePartitions the number of partitions for the
-   *        the edge RDD
+   * @param minEdgePartitions the number of partitions for the edge RDD
    */
   def edgeListFile(
       sc: SparkContext,
@@ -60,8 +59,9 @@ object GraphLoader extends Logging {
     val startTime = System.currentTimeMillis
 
     // Parse the edge data table directly into edge partitions
-    val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[Int]
+    val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
+    val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
+      val builder = new EdgePartitionBuilder[Int, Int]
       iter.foreach { line =>
         if (!line.isEmpty && line(0) != '#') {
           val lineArray = line.split("\\s+")
@@ -78,7 +78,7 @@ object GraphLoader extends Logging {
         }
       }
       Iterator((pid, builder.toEdgePartition))
-    }.cache()
+    }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
     edges.count()
 
     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 4997fbc..edd5b79 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.graphx
 
 import scala.reflect.ClassTag
-import org.apache.spark.SparkContext._
+import scala.util.Random
+
 import org.apache.spark.SparkException
-import org.apache.spark.graphx.lib._
+import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
-import scala.util.Random
+
+import org.apache.spark.graphx.lib._
 
 /**
  * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -43,19 +45,22 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
    * The in-degree of each vertex in the graph.
    * @note Vertices with no in-edges are not returned in the resulting RDD.
    */
-  @transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
+  @transient lazy val inDegrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")
 
   /**
    * The out-degree of each vertex in the graph.
    * @note Vertices with no out-edges are not returned in the resulting RDD.
    */
-  @transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
+  @transient lazy val outDegrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")
 
   /**
    * The degree of each vertex in the graph.
    * @note Vertices with no edges are not returned in the resulting RDD.
    */
-  @transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
+  @transient lazy val degrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")
 
   /**
    * Computes the neighboring vertex degrees.

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index ac07a59..4572eab 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.graphx
 
 import scala.reflect.ClassTag
+import org.apache.spark.Logging
 
 
 /**
@@ -52,7 +53,7 @@ import scala.reflect.ClassTag
  * }}}
  *
  */
-object Pregel {
+object Pregel extends Logging {
 
   /**
    * Execute a Pregel-like iterative vertex-parallel abstraction.  The
@@ -142,6 +143,9 @@ object Pregel {
       // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
       // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
       activeMessages = messages.count()
+
+      logInfo("Pregel finished iteration " + i)
+
       // Unpersist the RDDs hidden by newly-materialized RDDs
       oldMessages.unpersist(blocking=false)
       newVerts.unpersist(blocking=false)

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index f0fc605..8c62897 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -24,8 +24,11 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
 import org.apache.spark.storage.StorageLevel
 
-import org.apache.spark.graphx.impl.MsgRDDFunctions
-import org.apache.spark.graphx.impl.VertexPartition
+import org.apache.spark.graphx.impl.RoutingTablePartition
+import org.apache.spark.graphx.impl.ShippableVertexPartition
+import org.apache.spark.graphx.impl.VertexAttributeBlock
+import org.apache.spark.graphx.impl.RoutingTableMessageRDDFunctions._
+import org.apache.spark.graphx.impl.VertexRDDFunctions._
 
 /**
  * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
@@ -33,6 +36,9 @@ import org.apache.spark.graphx.impl.VertexPartition
  * joined efficiently. All operations except [[reindex]] preserve the index. To construct a
  * `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
  *
+ * Additionally, stores routing information to enable joining the vertex attributes with an
+ * [[EdgeRDD]].
+ *
  * @example Construct a `VertexRDD` from a plain RDD:
  * {{{
  * // Construct an initial vertex set
@@ -50,13 +56,11 @@ import org.apache.spark.graphx.impl.VertexPartition
  * @tparam VD the vertex attribute associated with each vertex in the set.
  */
 class VertexRDD[@specialized VD: ClassTag](
-    val partitionsRDD: RDD[VertexPartition[VD]])
+    val partitionsRDD: RDD[ShippableVertexPartition[VD]])
   extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
 
   require(partitionsRDD.partitioner.isDefined)
 
-  partitionsRDD.setName("VertexRDD")
-
   /**
    * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
    * VertexRDD will be based on a different index and can no longer be quickly joined with this
@@ -71,6 +75,16 @@ class VertexRDD[@specialized VD: ClassTag](
   override protected def getPreferredLocations(s: Partition): Seq[String] =
     partitionsRDD.preferredLocations(s)
 
+  override def setName(_name: String): this.type = {
+    if (partitionsRDD.name != null) {
+      partitionsRDD.setName(partitionsRDD.name + ", " + _name)
+    } else {
+      partitionsRDD.setName(_name)
+    }
+    this
+  }
+  setName("VertexRDD")
+
   override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
@@ -90,14 +104,14 @@ class VertexRDD[@specialized VD: ClassTag](
    * Provides the `RDD[(VertexId, VD)]` equivalent output.
    */
   override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
-    firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
+    firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator
   }
 
   /**
    * Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
    */
   private[graphx] def mapVertexPartitions[VD2: ClassTag](
-    f: VertexPartition[VD] => VertexPartition[VD2])
+      f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
     : VertexRDD[VD2] = {
     val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
     new VertexRDD(newPartitionsRDD)
@@ -208,10 +222,8 @@ class VertexRDD[@specialized VD: ClassTag](
       case _ =>
         new VertexRDD[VD3](
           partitionsRDD.zipPartitions(
-            other.partitionBy(this.partitioner.get), preservesPartitioning = true)
-          { (part, msgs) =>
-            val vertexPartition: VertexPartition[VD] = part.next()
-            Iterator(vertexPartition.leftJoin(msgs)(f))
+            other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
+            (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
           }
         )
     }
@@ -254,10 +266,8 @@ class VertexRDD[@specialized VD: ClassTag](
       case _ =>
         new VertexRDD(
           partitionsRDD.zipPartitions(
-            other.partitionBy(this.partitioner.get), preservesPartitioning = true)
-          { (part, msgs) =>
-            val vertexPartition: VertexPartition[VD] = part.next()
-            Iterator(vertexPartition.innerJoin(msgs)(f))
+            other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
+            (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
           }
         )
     }
@@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag](
    */
   def aggregateUsingIndex[VD2: ClassTag](
       messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
-    val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
+    val shuffled = messages.copartitionWithVertices(this.partitioner.get)
     val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
-      val vertexPartition: VertexPartition[VD] = thisIter.next()
-      Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
+      thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
     }
     new VertexRDD[VD2](parts)
   }
 
+  /**
+   * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
+   * [[EdgeRDD]].
+   */
+  def reverseRoutingTables(): VertexRDD[VD] =
+    this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
+
+  /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
+  private[graphx] def shipVertexAttributes(
+      shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
+    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst)))
+  }
+
+  /** Generates an RDD of vertex IDs suitable for shipping to the edge partitions. */
+  private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = {
+    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
+  }
+
 } // end of VertexRDD
 
 
@@ -293,52 +320,101 @@ class VertexRDD[@specialized VD: ClassTag](
 object VertexRDD {
 
   /**
-   * Construct a `VertexRDD` from an RDD of vertex-attribute pairs.
-   * Duplicate entries are removed arbitrarily.
+   * Constructs a standalone `VertexRDD` (one that is not set up for efficient joins with an
+   * [[EdgeRDD]]) from an RDD of vertex-attribute pairs. Duplicate entries are removed arbitrarily.
    *
    * @tparam VD the vertex attribute type
    *
-   * @param rdd the collection of vertex-attribute pairs
+   * @param vertices the collection of vertex-attribute pairs
    */
-  def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = {
-    val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
-      case Some(p) => rdd
-      case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+  def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+    val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
+      case Some(p) => vertices
+      case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
     }
-    val vertexPartitions = partitioned.mapPartitions(
-      iter => Iterator(VertexPartition(iter)),
+    val vertexPartitions = vPartitioned.mapPartitions(
+      iter => Iterator(ShippableVertexPartition(iter)),
       preservesPartitioning = true)
     new VertexRDD(vertexPartitions)
   }
 
   /**
-   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using
-   * `mergeFunc`.
+   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate vertex entries are
+   * removed arbitrarily. The resulting `VertexRDD` will be joinable with `edges`, and any missing
+   * vertices referred to by `edges` will be created with the attribute `defaultVal`.
    *
    * @tparam VD the vertex attribute type
    *
-   * @param rdd the collection of vertex-attribute pairs
-   * @param mergeFunc the associative, commutative merge function.
+   * @param vertices the collection of vertex-attribute pairs
+   * @param edges the [[EdgeRDD]] that these vertices may be joined with
+   * @param defaultVal the vertex attribute to use when creating missing vertices
    */
-  def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
-    val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
-      case Some(p) => rdd
-      case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+  def apply[VD: ClassTag](
+      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
+    VertexRDD(vertices, edges, defaultVal, (a, b) => b)
+  }
+
+  /**
+   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate vertex entries are
+   * merged using `mergeFunc`. The resulting `VertexRDD` will be joinable with `edges`, and any
+   * missing vertices referred to by `edges` will be created with the attribute `defaultVal`.
+   *
+   * @tparam VD the vertex attribute type
+   *
+   * @param vertices the collection of vertex-attribute pairs
+   * @param edges the [[EdgeRDD]] that these vertices may be joined with
+   * @param defaultVal the vertex attribute to use when creating missing vertices
+   * @param mergeFunc the commutative, associative duplicate vertex attribute merge function
+   */
+  def apply[VD: ClassTag](
+      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
+    ): VertexRDD[VD] = {
+    val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
+      case Some(p) => vertices
+      case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
+    }
+    val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
+    val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
+      (vertexIter, routingTableIter) =>
+        val routingTable =
+          if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+        Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
     }
-    val vertexPartitions = partitioned.mapPartitions(
-      iter => Iterator(VertexPartition(iter)),
-      preservesPartitioning = true)
     new VertexRDD(vertexPartitions)
   }
 
   /**
-   * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
-   * `defaultVal` otherwise.
+   * Constructs a `VertexRDD` containing all vertices referred to in `edges`. The vertices will be
+   * created with the attribute `defaultVal`. The resulting `VertexRDD` will be joinable with
+   * `edges`.
+   *
+   * @tparam VD the vertex attribute type
+   *
+   * @param edges the [[EdgeRDD]] referring to the vertices to create
+   * @param numPartitions the desired number of partitions for the resulting `VertexRDD`
+   * @param defaultVal the vertex attribute to use when creating missing vertices
    */
-  def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD)
-    : VertexRDD[VD] = {
-    VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
-      value.getOrElse(default)
-    }
+  def fromEdges[VD: ClassTag](
+      edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
+    val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
+    val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
+      val routingTable =
+        if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+      Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
+    }, preservesPartitioning = true)
+    new VertexRDD(vertexPartitions)
+  }
+
+  private def createRoutingTables(
+      edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
+    // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
+    val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
+      Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
+      .setName("VertexRDD.createRoutingTables - vid2pid (aggregation)")
+
+    val numEdgePartitions = edges.partitions.size
+    vid2pid.copartitionWithVertices(vertexPartitioner).mapPartitions(
+      iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
+      preservesPartitioning = true)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index b7c472e..871e81f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -17,39 +17,86 @@
 
 package org.apache.spark.graphx.impl
 
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
 
 /**
- * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
- * clustered by src.
+ * A collection of edges stored in columnar format, along with any vertex attributes referenced. The
+ * edges are stored in 3 large columnar arrays (src, dst, attribute). The arrays are clustered by
+ * src. There is an optional active vertex set for filtering computation on the edges.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the vertex attribute type
  *
  * @param srcIds the source vertex id of each edge
  * @param dstIds the destination vertex id of each edge
  * @param data the attribute associated with each edge
  * @param index a clustered index on source vertex id
- * @tparam ED the edge attribute type.
+ * @param vertices a map from referenced vertex ids to their corresponding attributes. Must
+ *   contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for
+ *   those vertex ids. The mask is not used.
+ * @param activeSet an optional active vertex set for filtering computation on the edges
  */
 private[graphx]
-class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
+class EdgePartition[
+    @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
     @transient val srcIds: Array[VertexId],
     @transient val dstIds: Array[VertexId],
     @transient val data: Array[ED],
-    @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
+    @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
+    @transient val vertices: VertexPartition[VD],
+    @transient val activeSet: Option[VertexSet] = None
+  ) extends Serializable {
+
+  /** Return a new `EdgePartition` with the specified edge data. */
+  def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] = {
+    new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
+  }
+
+  /** Return a new `EdgePartition` with the specified vertex partition. */
+  def withVertices[VD2: ClassTag](
+      vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
+    new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
+  }
+
+  /** Return a new `EdgePartition` with the specified active set, provided as an iterator. */
+  def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = {
+    val newActiveSet = new VertexSet
+    iter.foreach(newActiveSet.add(_))
+    new EdgePartition(srcIds, dstIds, data, index, vertices, Some(newActiveSet))
+  }
+
+  /** Return a new `EdgePartition` with the specified active set. */
+  def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD] = {
+    new EdgePartition(srcIds, dstIds, data, index, vertices, activeSet_)
+  }
+
+  /** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
+  def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
+    this.withVertices(vertices.innerJoinKeepLeft(iter))
+  }
+
+  /** Look up vid in activeSet, throwing an exception if it is None. */
+  def isActive(vid: VertexId): Boolean = {
+    activeSet.get.contains(vid)
+  }
+
+  /** The number of active vertices, if any exist. */
+  def numActives: Option[Int] = activeSet.map(_.size)
 
   /**
    * Reverse all the edges in this partition.
    *
    * @return a new edge partition with all edges reversed.
    */
-  def reverse: EdgePartition[ED] = {
-    val builder = new EdgePartitionBuilder(size)
+  def reverse: EdgePartition[ED, VD] = {
+    val builder = new EdgePartitionBuilder(size)(classTag[ED], classTag[VD])
     for (e <- iterator) {
       builder.add(e.dstId, e.srcId, e.attr)
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -64,7 +111,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @return a new edge partition with the result of the function `f`
    *         applied to each edge
    */
-  def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = {
+  def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2, VD] = {
     val newData = new Array[ED2](data.size)
     val edge = new Edge[ED]()
     val size = data.size
@@ -76,7 +123,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       newData(i) = f(edge)
       i += 1
     }
-    new EdgePartition(srcIds, dstIds, newData, index)
+    this.withData(newData)
   }
 
   /**
@@ -91,7 +138,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @tparam ED2 the type of the new attribute
    * @return a new edge partition with the attribute values replaced
    */
-  def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
+  def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2, VD] = {
     // Faster than iter.toArray, because the expected size is known.
     val newData = new Array[ED2](data.size)
     var i = 0
@@ -100,7 +147,23 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       i += 1
     }
     assert(newData.size == i)
-    new EdgePartition(srcIds, dstIds, newData, index)
+    this.withData(newData)
+  }
+
+  /**
+   * Construct a new edge partition containing only the edges matching `epred` and where both
+   * vertices match `vpred`.
+   */
+  def filter(
+      epred: EdgeTriplet[VD, ED] => Boolean,
+      vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
+    val filtered = tripletIterator().filter(et =>
+      vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et))
+    val builder = new EdgePartitionBuilder[ED, VD]
+    for (e <- filtered) {
+      builder.add(e.srcId, e.dstId, e.attr)
+    }
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -119,8 +182,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @param merge a commutative associative merge operation
    * @return a new edge partition without duplicate edges
    */
-  def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
-    val builder = new EdgePartitionBuilder[ED]
+  def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
+    val builder = new EdgePartitionBuilder[ED, VD]
     var currSrcId: VertexId = null.asInstanceOf[VertexId]
     var currDstId: VertexId = null.asInstanceOf[VertexId]
     var currAttr: ED = null.asInstanceOf[ED]
@@ -141,11 +204,11 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
     if (size > 0) {
       builder.add(currSrcId, currDstId, currAttr)
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
-   * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition
+   * Apply `f` to all edges present in both `this` and `other` and return a new `EdgePartition`
    * containing the resulting edges.
    *
    * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for
@@ -155,9 +218,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * once.
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
-      (other: EdgePartition[ED2])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = {
-    val builder = new EdgePartitionBuilder[ED3]
+      (other: EdgePartition[ED2, _])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
+    val builder = new EdgePartitionBuilder[ED3, VD]
     var i = 0
     var j = 0
     // For i = index of each edge in `this`...
@@ -175,7 +238,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       }
       i += 1
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -183,7 +246,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    *
    * @return size of the partition
    */
-  def size: Int = srcIds.size
+  val size: Int = srcIds.size
 
   /** The number of unique source vertices in the partition. */
   def indexSize: Int = index.size
@@ -212,9 +275,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
   }
 
   /**
+   * Get an iterator over the edge triplets in this partition.
+   *
+   * It is safe to keep references to the objects from this iterator.
+   */
+  def tripletIterator(
+      includeSrc: Boolean = true, includeDst: Boolean = true): Iterator[EdgeTriplet[VD, ED]] = {
+    new EdgeTripletIterator(this, includeSrc, includeDst)
+  }
+
+  /**
+   * Upgrade the given edge iterator into a triplet iterator.
+   *
+   * Be careful not to keep references to the objects from this iterator. To improve GC performance
+   * the same object is re-used in `next()`.
+   */
+  def upgradeIterator(
+      edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
+    : Iterator[EdgeTriplet[VD, ED]] = {
+    new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+  }
+
+  /**
    * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
    * iterator is generated using an index scan, so it is efficient at skipping edges that don't
    * match srcIdPred.
+   *
+   * Be careful not to keep references to the objects from this iterator. To improve GC performance
+   * the same object is re-used in `next()`.
    */
   def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
     index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 63ccccb..ecb49be 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -20,12 +20,14 @@ package org.apache.spark.graphx.impl
 import scala.reflect.ClassTag
 import scala.util.Sorting
 
+import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
+
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.PrimitiveVector
 
 private[graphx]
-class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
+class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
+    size: Int = 64) {
   var edges = new PrimitiveVector[Edge[ED]](size)
 
   /** Add a new edge to the partition. */
@@ -33,7 +35,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
     edges += Edge(src, dst, d)
   }
 
-  def toEdgePartition: EdgePartition[ED] = {
+  def toEdgePartition: EdgePartition[ED, VD] = {
     val edgeArray = edges.trim().array
     Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
     val srcIds = new Array[VertexId](edgeArray.size)
@@ -57,6 +59,14 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
         i += 1
       }
     }
-    new EdgePartition(srcIds, dstIds, data, index)
+
+    // Create and populate a VertexPartition with vids from the edges, but no attributes
+    val vidsIter = srcIds.iterator ++ dstIds.iterator
+    val vertexIds = new OpenHashSet[VertexId]
+    vidsIter.foreach(vid => vertexIds.add(vid))
+    val vertices = new VertexPartition(
+      vertexIds, new Array[VD](vertexIds.capacity), vertexIds.getBitSet)
+
+    new EdgePartition(srcIds, dstIds, data, index, vertices)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 220a89d..ebb0b94 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -23,32 +23,62 @@ import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
 
 /**
- * The Iterator type returned when constructing edge triplets. This class technically could be
- * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
- * debug / profile.
+ * The Iterator type returned when constructing edge triplets. This could be an anonymous class in
+ * EdgePartition.tripletIterator, but we name it here explicitly so it is easier to debug / profile.
  */
 private[impl]
 class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
-    val vidToIndex: VertexIdToIndexMap,
-    val vertexArray: Array[VD],
-    val edgePartition: EdgePartition[ED])
+    val edgePartition: EdgePartition[ED, VD],
+    val includeSrc: Boolean,
+    val includeDst: Boolean)
   extends Iterator[EdgeTriplet[VD, ED]] {
 
   // Current position in the array.
   private var pos = 0
 
-  private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
-
   override def hasNext: Boolean = pos < edgePartition.size
 
   override def next() = {
     val triplet = new EdgeTriplet[VD, ED]
     triplet.srcId = edgePartition.srcIds(pos)
-    triplet.srcAttr = vmap(triplet.srcId)
+    if (includeSrc) {
+      triplet.srcAttr = edgePartition.vertices(triplet.srcId)
+    }
     triplet.dstId = edgePartition.dstIds(pos)
-    triplet.dstAttr = vmap(triplet.dstId)
+    if (includeDst) {
+      triplet.dstAttr = edgePartition.vertices(triplet.dstId)
+    }
     triplet.attr = edgePartition.data(pos)
     pos += 1
     triplet
   }
 }
+
+/**
+ * An Iterator type for internal use that reuses EdgeTriplet objects. This could be an anonymous
+ * class in EdgePartition.upgradeIterator, but we name it here explicitly so it is easier to debug /
+ * profile.
+ */
+private[impl]
+class ReusingEdgeTripletIterator[VD: ClassTag, ED: ClassTag](
+    val edgeIter: Iterator[Edge[ED]],
+    val edgePartition: EdgePartition[ED, VD],
+    val includeSrc: Boolean,
+    val includeDst: Boolean)
+  extends Iterator[EdgeTriplet[VD, ED]] {
+
+  private val triplet = new EdgeTriplet[VD, ED]
+
+  override def hasNext = edgeIter.hasNext
+
+  override def next() = {
+    triplet.set(edgeIter.next())
+    if (includeSrc) {
+      triplet.srcAttr = edgePartition.vertices(triplet.srcId)
+    }
+    if (includeDst) {
+      triplet.dstAttr = edgePartition.vertices(triplet.dstId)
+    }
+    triplet
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 9eabccd..2f2d0e0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -19,54 +19,45 @@ package org.apache.spark.graphx.impl
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.util.collection.PrimitiveVector
-import org.apache.spark.{HashPartitioner, Partitioner}
+import org.apache.spark.HashPartitioner
 import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{RDD, ShuffledRDD}
+import org.apache.spark.storage.StorageLevel
+
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.impl.GraphImpl._
 import org.apache.spark.graphx.impl.MsgRDDFunctions._
 import org.apache.spark.graphx.util.BytecodeUtils
-import org.apache.spark.rdd.{ShuffledRDD, RDD}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.ClosureCleaner
 
 
 /**
- * A graph that supports computation on graphs.
+ * An implementation of [[org.apache.spark.graphx.Graph]] to support computation on graphs.
  *
- * Graphs are represented using two classes of data: vertex-partitioned and
- * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges`
- * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods,
- * vertex attributes are replicated to the edge partitions where they appear as sources or
- * destinations. `routingTable` stores the routing information for shipping vertex attributes to
- * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created
- * using the routing table.
+ * Graphs are represented using two RDDs: `vertices`, which contains vertex attributes and the
+ * routing information for shipping vertex attributes to edge partitions, and
+ * `replicatedVertexView`, which contains edges and the vertex attributes mentioned by each edge.
  */
 class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     @transient val vertices: VertexRDD[VD],
-    @transient val edges: EdgeRDD[ED],
-    @transient val routingTable: RoutingTable,
-    @transient val replicatedVertexView: ReplicatedVertexView[VD])
+    @transient val replicatedVertexView: ReplicatedVertexView[VD, ED])
   extends Graph[VD, ED] with Serializable {
 
   /** Default constructor is provided to support serialization */
-  protected def this() = this(null, null, null, null)
+  protected def this() = this(null, null)
+
+  @transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges
 
   /** Return a RDD that brings edges together with their source and destination vertices. */
-  @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
-    val vdTag = classTag[VD]
-    val edTag = classTag[ED]
-    edges.partitionsRDD.zipPartitions(
-      replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
-      val (pid, ePart) = ePartIter.next()
-      val (_, vPart) = vPartIter.next()
-      new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
-    }
+  @transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = {
+    replicatedVertexView.upgrade(vertices, true, true)
+    replicatedVertexView.edges.partitionsRDD.mapPartitions(_.flatMap {
+      case (pid, part) => part.tripletIterator()
+    })
   }
 
   override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
     vertices.persist(newLevel)
-    edges.persist(newLevel)
+    replicatedVertexView.edges.persist(newLevel)
     this
   }
 
@@ -74,14 +65,15 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
     vertices.unpersist(blocking)
-    replicatedVertexView.unpersist(blocking)
+    // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone
     this
   }
 
   override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
-    val numPartitions = edges.partitions.size
+    val numPartitions = replicatedVertexView.edges.partitions.size
     val edTag = classTag[ED]
-    val newEdges = new EdgeRDD(edges.map { e =>
+    val vdTag = classTag[VD]
+    val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e =>
       val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
 
       // Should we be using 3-tuple or an optimized class
@@ -89,105 +81,79 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     }
       .partitionBy(new HashPartitioner(numPartitions))
       .mapPartitionsWithIndex( { (pid, iter) =>
-        val builder = new EdgePartitionBuilder[ED]()(edTag)
+        val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
         iter.foreach { message =>
           val data = message.data
           builder.add(data._1, data._2, data._3)
         }
         val edgePartition = builder.toEdgePartition
         Iterator((pid, edgePartition))
-      }, preservesPartitioning = true).cache())
-    GraphImpl(vertices, newEdges)
+      }, preservesPartitioning = true))
+    GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 
   override def reverse: Graph[VD, ED] = {
-    val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
-    GraphImpl(vertices, newETable)
+    new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
   }
 
   override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
     if (classTag[VD] equals classTag[VD2]) {
+      vertices.cache()
       // The map preserves type, so we can use incremental replication
       val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
       val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
-      val newReplicatedVertexView = new ReplicatedVertexView[VD2](
-        changedVerts, edges, routingTable,
-        Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
-      new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
+        .updateVertices(changedVerts)
+      new GraphImpl(newVerts, newReplicatedVertexView)
     } else {
       // The map does not preserve type, so we must re-replicate all vertices
-      GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
+      GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
     }
   }
 
   override def mapEdges[ED2: ClassTag](
       f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
-    val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
-    new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges
+      .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   override def mapTriplets[ED2: ClassTag](
       f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
-    val newEdgePartitions =
-      edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
-        (ePartIter, vTableReplicatedIter) =>
-        val (ePid, edgePartition) = ePartIter.next()
-        val (vPid, vPart) = vTableReplicatedIter.next()
-        assert(!vTableReplicatedIter.hasNext)
-        assert(ePid == vPid)
-        val et = new EdgeTriplet[VD, ED]
-        val inputIterator = edgePartition.iterator.map { e =>
-          et.set(e)
-          et.srcAttr = vPart(e.srcId)
-          et.dstAttr = vPart(e.dstId)
-          et
-        }
-        // Apply the user function to the vertex partition
-        val outputIter = f(ePid, inputIterator)
-        // Consume the iterator to update the edge attributes
-        val newEdgePartition = edgePartition.map(outputIter)
-        Iterator((ePid, newEdgePartition))
-      }
-    new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
+    vertices.cache()
+    val mapUsesSrcAttr = accessesVertexAttr(f, "srcAttr")
+    val mapUsesDstAttr = accessesVertexAttr(f, "dstAttr")
+    replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+    val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
+      part.map(f(pid, part.tripletIterator(mapUsesSrcAttr, mapUsesDstAttr)))
+    }
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   override def subgraph(
       epred: EdgeTriplet[VD, ED] => Boolean = x => true,
       vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+    vertices.cache()
     // Filter the vertices, reusing the partitioner and the index from this graph
     val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
-
-    // Filter the edges
-    val edTag = classTag[ED]
-    val newEdges = new EdgeRDD[ED](triplets.filter { et =>
-      vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
-    }.mapPartitionsWithIndex( { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[ED]()(edTag)
-      iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) }
-      val edgePartition = builder.toEdgePartition
-      Iterator((pid, edgePartition))
-    }, preservesPartitioning = true)).cache()
-
-    // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
-    // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
-    // an edge.
-    new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView)
-  } // end of subgraph
+    // Filter the triplets. We must always upgrade the triplet view fully because vpred always runs
+    // on both src and dst vertices
+    replicatedVertexView.upgrade(vertices, true, true)
+    val newEdges = replicatedVertexView.edges.filter(epred, vpred)
+    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
+  }
 
   override def mask[VD2: ClassTag, ED2: ClassTag] (
       other: Graph[VD2, ED2]): Graph[VD, ED] = {
     val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
-    val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
-    // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
-    // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
-    // an edge.
-    new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
+    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
   }
 
   override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
-    ClosureCleaner.clean(merge)
-    val newETable = edges.mapEdgePartitions((pid, part) => part.groupEdges(merge))
-    new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges.mapEdgePartitions(
+      (pid, part) => part.groupEdges(merge))
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   // ///////////////////////////////////////////////////////////////////////////////////////////////
@@ -199,68 +165,58 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
       reduceFunc: (A, A) => A,
       activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
 
-    ClosureCleaner.clean(mapFunc)
-    ClosureCleaner.clean(reduceFunc)
+    vertices.cache()
 
     // For each vertex, replicate its attribute only to partitions where it is
     // in the relevant position in an edge.
     val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
     val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
-    val vs = activeSetOpt match {
+    replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+    val view = activeSetOpt match {
       case Some((activeSet, _)) =>
-        replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+        replicatedVertexView.withActiveSet(activeSet)
       case None =>
-        replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr)
+        replicatedVertexView
     }
     val activeDirectionOpt = activeSetOpt.map(_._2)
 
     // Map and combine.
-    val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
-      val (ePid, edgePartition) = ePartIter.next()
-      val (vPid, vPart) = vPartIter.next()
-      assert(!vPartIter.hasNext)
-      assert(ePid == vPid)
-      // Choose scan method
-      val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
-      val edgeIter = activeDirectionOpt match {
-        case Some(EdgeDirection.Both) =>
-          if (activeFraction < 0.8) {
-            edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
-              .filter(e => vPart.isActive(e.dstId))
-          } else {
-            edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
-          }
-        case Some(EdgeDirection.Either) =>
-          // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
-          // the index here. Instead we have to scan all edges and then do the filter.
-          edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
-        case Some(EdgeDirection.Out) =>
-          if (activeFraction < 0.8) {
-            edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
-          } else {
-            edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
-          }
-        case Some(EdgeDirection.In) =>
-          edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
-        case _ => // None
-          edgePartition.iterator
-      }
-
-      // Scan edges and run the map function
-      val et = new EdgeTriplet[VD, ED]
-      val mapOutputs = edgeIter.flatMap { e =>
-        et.set(e)
-        if (mapUsesSrcAttr) {
-          et.srcAttr = vPart(e.srcId)
-        }
-        if (mapUsesDstAttr) {
-          et.dstAttr = vPart(e.dstId)
+    val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
+      case (pid, edgePartition) =>
+        // Choose scan method
+        val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
+        val edgeIter = activeDirectionOpt match {
+          case Some(EdgeDirection.Both) =>
+            if (activeFraction < 0.8) {
+              edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+                .filter(e => edgePartition.isActive(e.dstId))
+            } else {
+              edgePartition.iterator.filter(e =>
+                edgePartition.isActive(e.srcId) && edgePartition.isActive(e.dstId))
+            }
+          case Some(EdgeDirection.Either) =>
+            // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
+            // the index here. Instead we have to scan all edges and then do the filter.
+            edgePartition.iterator.filter(e =>
+              edgePartition.isActive(e.srcId) || edgePartition.isActive(e.dstId))
+          case Some(EdgeDirection.Out) =>
+            if (activeFraction < 0.8) {
+              edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+            } else {
+              edgePartition.iterator.filter(e => edgePartition.isActive(e.srcId))
+            }
+          case Some(EdgeDirection.In) =>
+            edgePartition.iterator.filter(e => edgePartition.isActive(e.dstId))
+          case _ => // None
+            edgePartition.iterator
         }
-        mapFunc(et)
-      }
-      // Note: This doesn't allow users to send messages to arbitrary vertices.
-      vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
-    }
+
+        // Scan edges and run the map function
+        val mapOutputs = edgePartition.upgradeIterator(edgeIter, mapUsesSrcAttr, mapUsesDstAttr)
+          .flatMap(mapFunc(_))
+        // Note: This doesn't allow users to send messages to arbitrary vertices.
+        edgePartition.vertices.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
+    }).setName("GraphImpl.mapReduceTriplets - preAgg")
 
     // do the final reduction reusing the index map
     vertices.aggregateUsingIndex(preAgg, reduceFunc)
@@ -268,20 +224,19 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
   override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
       (other: RDD[(VertexId, U)])
-      (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
-  {
+      (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = {
     if (classTag[VD] equals classTag[VD2]) {
+      vertices.cache()
       // updateF preserves type, so we can use incremental replication
-      val newVerts = vertices.leftJoin(other)(updateF)
+      val newVerts = vertices.leftJoin(other)(updateF).cache()
       val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
-      val newReplicatedVertexView = new ReplicatedVertexView[VD2](
-        changedVerts, edges, routingTable,
-        Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
-      new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
+        .updateVertices(changedVerts)
+      new GraphImpl(newVerts, newReplicatedVertexView)
     } else {
       // updateF does not preserve type, so we must re-replicate all vertices
       val newVerts = vertices.leftJoin(other)(updateF)
-      GraphImpl(newVerts, edges, routingTable)
+      GraphImpl(newVerts, replicatedVertexView.edges)
     }
   }
 
@@ -298,73 +253,68 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
 object GraphImpl {
 
+  /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] =
-  {
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
     fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
   }
 
+  /** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */
   def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
-      edgePartitions: RDD[(PartitionID, EdgePartition[ED])],
+      edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
       defaultVertexAttr: VD): GraphImpl[VD, ED] = {
     fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
   }
 
+  /** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: RDD[(VertexId, VD)],
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] =
-  {
-    val edgeRDD = createEdgeRDD(edges).cache()
-
-    // Get the set of all vids
-    val partitioner = Partitioner.defaultPartitioner(vertices)
-    val vPartitioned = vertices.partitionBy(partitioner)
-    val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner)
-    val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
-      vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
-    }
-
-    val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr)
-
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+    val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache()
+    val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
     GraphImpl(vertexRDD, edgeRDD)
   }
 
+  /** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
-      edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
-    // Cache RDDs that are referenced multiple times
-    edges.cache()
-
-    GraphImpl(vertices, edges, new RoutingTable(edges, vertices))
+      edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
+    // Convert the vertex partitions in edges to the correct type
+    val newEdges = edges.mapEdgePartitions(
+      (pid, part) => part.withVertices(part.vertices.map(
+        (vid, attr) => null.asInstanceOf[VD])))
+    GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 
-  def apply[VD: ClassTag, ED: ClassTag](
+  /**
+   * Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the
+   * vertices.
+   */
+  def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
-      edges: EdgeRDD[ED],
-      routingTable: RoutingTable): GraphImpl[VD, ED] = {
-    // Cache RDDs that are referenced multiple times. `routingTable` is cached by default, so we
-    // don't cache it explicitly.
-    vertices.cache()
-    edges.cache()
-
-    new GraphImpl(
-      vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
+      edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = {
+    new GraphImpl(vertices, new ReplicatedVertexView(edges))
   }
 
   /**
-   * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
-   * data structure (RDD[(VertexId, VertexId, ED)]).
-   *
-   * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
-   * pair: the key is the partition id, and the value is an EdgePartition object containing all the
-   * edges in a partition.
+   * Create a graph from an EdgeRDD with the correct vertex type, setting missing vertices to
+   * `defaultVertexAttr`. The vertices will have the same number of partitions as the EdgeRDD.
    */
-  private def createEdgeRDD[ED: ClassTag](
-      edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
+  private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
+      edges: EdgeRDD[ED, VD],
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+    edges.cache()
+    val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr)
+    fromExistingRDDs(vertices, edges)
+  }
+
+  /** Create an EdgeRDD from a set of edges. */
+  private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
+      edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
     val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[ED]
+      val builder = new EdgePartitionBuilder[ED, VD]
       iter.foreach { e =>
         builder.add(e.srcId, e.dstId, e.attr)
       }
@@ -373,24 +323,4 @@ object GraphImpl {
     new EdgeRDD(edgePartitions)
   }
 
-  private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
-      edges: EdgeRDD[ED],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    edges.cache()
-    // Get the set of all vids
-    val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size))
-    // Create the VertexRDD.
-    val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
-    GraphImpl(vertices, edges)
-  }
-
-  /** Collects all vids mentioned in edges and partitions them by partitioner. */
-  private def collectVertexIdsFromEdges(
-      edges: EdgeRDD[_],
-      partitioner: Partitioner): RDD[(VertexId, Int)] = {
-    // TODO: Consider doing map side distinct before shuffle.
-    new ShuffledRDD[VertexId, Int, (VertexId, Int)](
-      edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
-      .setSerializer(new VertexIdMsgSerializer)
-  }
 } // end of object GraphImpl

http://git-wip-us.apache.org/repos/asf/spark/blob/905173df/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index c45ba3d..1c6d7e5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -89,7 +89,6 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
 
 }
 
-
 private[graphx]
 object MsgRDDFunctions {
   implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = {
@@ -99,18 +98,28 @@ object MsgRDDFunctions {
   implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = {
     new VertexBroadcastMsgRDDFunctions(rdd)
   }
+}
 
-  def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = {
-    val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner)
+private[graphx]
+class VertexRDDFunctions[VD: ClassTag](self: RDD[(VertexId, VD)]) {
+  def copartitionWithVertices(partitioner: Partitioner): RDD[(VertexId, VD)] = {
+    val rdd = new ShuffledRDD[VertexId, VD, (VertexId, VD)](self, partitioner)
 
     // Set a custom serializer if the data is of int or double type.
-    if (classTag[T] == ClassTag.Int) {
+    if (classTag[VD] == ClassTag.Int) {
       rdd.setSerializer(new IntAggMsgSerializer)
-    } else if (classTag[T] == ClassTag.Long) {
+    } else if (classTag[VD] == ClassTag.Long) {
       rdd.setSerializer(new LongAggMsgSerializer)
-    } else if (classTag[T] == ClassTag.Double) {
+    } else if (classTag[VD] == ClassTag.Double) {
       rdd.setSerializer(new DoubleAggMsgSerializer)
     }
     rdd
   }
 }
+
+private[graphx]
+object VertexRDDFunctions {
+  implicit def rdd2VertexRDDFunctions[VD: ClassTag](rdd: RDD[(VertexId, VD)]) = {
+    new VertexRDDFunctions(rdd)
+  }
+}