You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ankurdave <gi...@git.apache.org> on 2014/04/23 06:49:12 UTC

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

GitHub user ankurdave opened a pull request:

    https://github.com/apache/spark/pull/497

    Unify GraphImpl RDDs + other graph load optimizations

    This PR makes the following changes, primarily in e4fbd329aef85fe2c38b0167255d2a712893d683:
    
    1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices).
    
    2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former.
    
    3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view.
    
    4. *Join elimination for mapTriplets.*
    
    5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ankurdave/spark unify-rdds

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/497.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #497
    
----
commit d64e8d49ec43d2fd6bde519b8aeafb5cc4f1be61
Author: Ankur Dave <an...@gmail.com>
Date:   2014-04-20T03:09:24Z

    Log current Pregel iteration

commit 62c7b7851301890814d523513fc5b67a0eb781ab
Author: Ankur Dave <an...@gmail.com>
Date:   2014-04-20T03:10:36Z

    In Analytics, take PageRank numIter

commit d6d60e21bfc97fa39b25aa875c96ca9fc05a9973
Author: Ankur Dave <an...@gmail.com>
Date:   2014-04-20T03:08:57Z

    In GraphLoader, coalesce to minEdgePartitions

commit e4fbd329aef85fe2c38b0167255d2a712893d683
Author: Ankur Dave <an...@gmail.com>
Date:   2014-04-13T02:18:37Z

    Unify GraphImpl RDDs + other graph load optimizations
    
    This commit makes the following changes:
    
    1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs:
    vertices, edges, routing table, and triplet view. This commit merges
    them down to two: vertices (with routing table), and edges (with
    replicated vertices).
    
    2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles
    when building a graph: one to extract routing information from the edges
    and move it to the vertices, and another to find nonexistent vertices
    referred to by edges. With this commit, the latter is done as a side
    effect of the former.
    
    3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side
    effect of unifying the edges and the triplet view.
    
    4. *Join elimination for mapTriplets.*
    
    5. *Ship only the needed vertex attributes when upgrading the
    triplet view.* If the triplet view already contains source attributes,
    and we now need both attributes, only ship destination attributes rather
    than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41124707
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14355/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42753685
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12453849
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
    @@ -17,39 +17,86 @@
     
     package org.apache.spark.graphx.impl
     
    -import scala.reflect.ClassTag
    +import scala.reflect.{classTag, ClassTag}
     
     import org.apache.spark.graphx._
     import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
     
     /**
    - * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
    - * clustered by src.
    + * A collection of edges stored in columnar format, along with any vertex attributes referenced. The
    + * edges are stored in 3 large columnar arrays (src, dst, attribute). The arrays are clustered by
    + * src. There is an optional active vertex set for filtering computation on the edges.
    + *
    + * @tparam ED the edge attribute type
    + * @tparam VD the vertex attribute type
      *
      * @param srcIds the source vertex id of each edge
      * @param dstIds the destination vertex id of each edge
      * @param data the attribute associated with each edge
      * @param index a clustered index on source vertex id
    - * @tparam ED the edge attribute type.
    + * @param vertices a map from referenced vertex ids to their corresponding attributes. Must
    + *   contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for
    + *   those vertex ids. The mask is not used.
    + * @param activeSet an optional active vertex set for filtering computation on the edges
      */
     private[graphx]
    -class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
    +class EdgePartition[
    +    @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
    --- End diff --
    
    specialize VD


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41127458
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14358/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12453694
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala ---
    @@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag](
        */
       def aggregateUsingIndex[VD2: ClassTag](
           messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
    -    val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
    +    val shuffled = messages.copartitionWithVertices(this.partitioner.get)
         val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
    -      val vertexPartition: VertexPartition[VD] = thisIter.next()
    -      Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
    +      thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
         }
         new VertexRDD[VD2](parts)
       }
     
    +  /**
    +   * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
    +   * [[EdgeRDD]].
    +   */
    +  def reverseRoutingTables(): VertexRDD[VD] =
    +    this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
    +
    +  /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
    +  private[graphx] def shipVertexAttributes(
    --- End diff --
    
    Should partitionBy before returning


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42755314
  
    Thanks everyone - I'm going to pull this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42340933
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12340534
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala ---
    @@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
         if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
     
       override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
    +
    +  def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr)
    --- End diff --
    
    Add an explicit return type to this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41124619
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12456034
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala ---
    @@ -21,192 +21,102 @@ import scala.reflect.{classTag, ClassTag}
     
     import org.apache.spark.SparkContext._
     import org.apache.spark.rdd.RDD
    -import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
     
     import org.apache.spark.graphx._
     
     /**
    - * A view of the vertices after they are shipped to the join sites specified in
    - * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is
    - * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a
    - * fresh view is created.
    - *
    - * The view is always cached (i.e., once it is evaluated, it remains materialized). This avoids
    - * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
    - * example. However, it means iterative algorithms must manually call `Graph.unpersist` on previous
    - * iterations' graphs for best GC performance. See the implementation of
    - * [[org.apache.spark.graphx.Pregel]] for an example.
    + * Manages shipping vertex attributes to the edge partitions of an
    + * [[org.apache.spark.graphx.EdgeRDD]]. Vertex attributes may be partially shipped to construct a
    + * triplet view with vertex attributes on only one side, and they may be updated. An active vertex
    + * set may additionally be shipped to the edge partitions. Be careful not to store a reference to
    + * `edges`, since it may be modified when the attribute shipping level is upgraded.
      */
     private[impl]
    -class ReplicatedVertexView[VD: ClassTag](
    -    updatedVerts: VertexRDD[VD],
    -    edges: EdgeRDD[_],
    -    routingTable: RoutingTable,
    -    prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
    +class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
    +    var edges: EdgeRDD[ED, VD],
    +    var hasSrcId: Boolean = false,
    +    var hasDstId: Boolean = false) {
     
       /**
    -   * Within each edge partition, create a local map from vid to an index into the attribute
    -   * array. Each map contains a superset of the vertices that it will receive, because it stores
    -   * vids from both the source and destination of edges. It must always include both source and
    -   * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
    +   * Return a new `ReplicatedVertexView` with the specified `EdgeRDD`, which must have the same
    +   * shipping level.
        */
    -  private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
    -    case Some(prevView) =>
    -      prevView.localVertexIdMap
    -    case None =>
    -      edges.partitionsRDD.mapPartitions(_.map {
    -        case (pid, epart) =>
    -          val vidToIndex = new VertexIdToIndexMap
    -          epart.foreach { e =>
    -            vidToIndex.add(e.srcId)
    -            vidToIndex.add(e.dstId)
    -          }
    -          (pid, vidToIndex)
    -      }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap")
    -  }
    -
    -  private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
    -  private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false)
    -  private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true)
    -  private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false)
    -
    -  def unpersist(blocking: Boolean = true): ReplicatedVertexView[VD] = {
    -    bothAttrs.unpersist(blocking)
    -    srcAttrOnly.unpersist(blocking)
    -    dstAttrOnly.unpersist(blocking)
    -    noAttrs.unpersist(blocking)
    -    // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it
    -    // without modification
    -    this
    +  def withEdges[VD2: ClassTag, ED2: ClassTag](
    +      edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
    +    new ReplicatedVertexView(edges_, hasSrcId, hasDstId)
       }
     
    -  def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = {
    -    (includeSrc, includeDst) match {
    -      case (true, true) => bothAttrs
    -      case (true, false) => srcAttrOnly
    -      case (false, true) => dstAttrOnly
    -      case (false, false) => noAttrs
    -    }
    +  /**
    +   * Return a new `ReplicatedVertexView` where edges are reversed and shipping levels are swapped to
    +   * match.
    +   */
    +  def reverse() = {
    +    val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
    +    new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
       }
     
    -  def get(
    -      includeSrc: Boolean,
    -      includeDst: Boolean,
    -      actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = {
    -    // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
    -    // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
    -    // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
    -    // also shipped there.
    -    val shippedActives = routingTable.get(true, true)
    -      .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _))
    -      .partitionBy(edges.partitioner.get)
    -    // Update the view with shippedActives, setting activeness flags in the resulting
    -    // VertexPartitions
    -    get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
    -      val (pid, vPart) = viewIter.next()
    -      val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
    -      Iterator((pid, newPart))
    +  /**
    +   * Upgrade the shipping level in-place to the specified levels by shipping vertex attributes from
    +   * `vertices`. This operation modifies the `ReplicatedVertexView`, and callers can access `edges`
    +   * afterwards to obtain the upgraded view.
    +   */
    +  def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
    +    val shipSrc = includeSrc && !hasSrcId
    +    val shipDst = includeDst && !hasDstId
    +    if (shipSrc || shipDst) {
    +      val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
    +        vertices.shipVertexAttributes(shipSrc, shipDst)
    +          .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
    +            includeSrc, includeDst, shipSrc, shipDst))
    +          .partitionBy(edges.partitioner.get)
    +      val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
    +        (ePartIter, shippedVertsIter) => ePartIter.map {
    +          case (pid, edgePartition) =>
    +            (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
    +        }
    +      })
    +      edges = newEdges
    +      hasSrcId = includeSrc
    +      hasDstId = includeDst
         }
       }
     
    -  private def create(includeSrc: Boolean, includeDst: Boolean)
    -    : RDD[(PartitionID, VertexPartition[VD])] = {
    -    val vdTag = classTag[VD]
    -
    -    // Ship vertex attributes to edge partitions according to vertexPlacement
    -    val verts = updatedVerts.partitionsRDD
    -    val shippedVerts = routingTable.get(includeSrc, includeDst)
    -      .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag))
    +  /**
    +   * Return a new `ReplicatedVertexView` where the `activeSet` in each edge partition contains only
    +   * vertex ids present in `actives`. This ships a vertex id to all edge partitions where it is
    +   * referenced, ignoring the attribute shipping level.
    +   */
    +  def withActiveSet(actives: VertexRDD[_]): ReplicatedVertexView[VD, ED] = {
    +    val shippedActives = actives.shipVertexIds()
    +      .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)")
           .partitionBy(edges.partitioner.get)
    -    // TODO: Consider using a specialized shuffler.
    -
    -    prevViewOpt match {
    -      case Some(prevView) =>
    -        // Update prevView with shippedVerts, setting staleness flags in the resulting
    -        // VertexPartitions
    -        prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
    -          (prevViewIter, shippedVertsIter) =>
    -            val (pid, prevVPart) = prevViewIter.next()
    -            val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
    -            Iterator((pid, newVPart))
    -        }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))
     
    -      case None =>
    -        // Within each edge partition, place the shipped vertex attributes into the correct
    -        // locations specified in localVertexIdMap
    -        localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
    -          val (pid, vidToIndex) = mapIter.next()
    -          assert(!mapIter.hasNext)
    -          // Populate the vertex array using the vidToIndex map
    -          val vertexArray = vdTag.newArray(vidToIndex.capacity)
    -          for ((_, block) <- shippedVertsIter) {
    -            for (i <- 0 until block.vids.size) {
    -              val vid = block.vids(i)
    -              val attr = block.attrs(i)
    -              val ind = vidToIndex.getPos(vid)
    -              vertexArray(ind) = attr
    -            }
    -          }
    -          val newVPart = new VertexPartition(
    -            vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag)
    -          Iterator((pid, newVPart))
    -        }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
    -    }
    -  }
    -}
    -
    -private object ReplicatedVertexView {
    -  protected def buildBuffer[VD: ClassTag](
    -      pid2vidIter: Iterator[Array[Array[VertexId]]],
    -      vertexPartIter: Iterator[VertexPartition[VD]]) = {
    -    val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
    -    val vertexPart: VertexPartition[VD] = vertexPartIter.next()
    -
    -    Iterator.tabulate(pid2vid.size) { pid =>
    -      val vidsCandidate = pid2vid(pid)
    -      val size = vidsCandidate.length
    -      val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
    -      val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
    -      var i = 0
    -      while (i < size) {
    -        val vid = vidsCandidate(i)
    -        if (vertexPart.isDefined(vid)) {
    -          vids += vid
    -          attrs += vertexPart(vid)
    -        }
    -        i += 1
    +    val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
    +      (ePartIter, shippedActivesIter) => ePartIter.map {
    +        case (pid, edgePartition) =>
    +          (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
           }
    -      (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
    -    }
    +    })
    +    new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
    --- End diff --
    
    Use withEdges


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42489547
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41125074
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42754279
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42753773
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14873/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42394658
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14757/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12341257
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.Partitioner
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rdd.ShuffledRDD
    +import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +/**
    + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
    + * the edge partition references `vid` in the specified `position` (src, dst, or both).
    +*/
    +private[graphx]
    +class RoutingTableMessage(
    +    var vid: VertexId,
    +    var pid: PartitionID,
    +    var position: Byte)
    +  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
    +  override def _1 = vid
    +  override def _2 = (pid, position)
    +  override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
    +}
    +
    +private[graphx]
    +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
    +  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
    +  def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
    +    new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
    +      .setSerializer(new RoutingTableMessageSerializer)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTableMessageRDDFunctions {
    +  import scala.language.implicitConversions
    +
    +  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
    +    new RoutingTableMessageRDDFunctions(rdd)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTablePartition {
    +  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
    +
    +  /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
    +  def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
    +    : Iterator[RoutingTableMessage] = {
    +    // Determine which positions each vertex id appears in using a map where the low 2 bits
    +    // represent src and dst
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
    +    edgePartition.srcIds.iterator.foreach { srcId =>
    +      map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
    +    }
    +    edgePartition.dstIds.iterator.foreach { dstId =>
    +      map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
    +    }
    +    map.iterator.map { case (vid, position) => new RoutingTableMessage(vid, pid, position) }
    --- End diff --
    
    map with case is also probably slower than it needs to be


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42397518
  
    Thanks for the comments! I implemented your suggestions as well as some cleanups I'd discussed with @rxin, then benchmarked it on 16 m2.4xlarge machines with the uk-2007-05 web graph for 10 iterations of PageRank to make sure it didn't introduce a performance regression.
    
    Commit | Runtime +/- SEM
    --- | ---
    pre-unify-rdds (f96ea3a02f64e6e0fec4dcb1f498c6bcde4af699) | 418 +/- 17.1 s
    unify-rdds (57202e81cc89056d86d9d9de18173b7553300a97) | 390.5 +/- 13.9 s



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12454278
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
    @@ -212,9 +275,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       }
     
       /**
    +   * Get an iterator over the edge triplets in this partition.
    +   *
    +   * It is safe to keep references to the objects from this iterator.
    +   */
    +  def tripletIterator(
    +      includeSrc: Boolean = true, includeDst: Boolean = true): Iterator[EdgeTriplet[VD, ED]] = {
    --- End diff --
    
    Consider making EdgePartition know its own includeSrc and includeDst, then assert equality here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41126492
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42483842
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12341223
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala ---
    @@ -0,0 +1,237 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.language.higherKinds
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.collection.BitSet
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +/**
    + * An class containing additional operations for subclasses of VertexPartitionBase that provide
    + * implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for
    + * example, [[VertexPartition.VertexPartitionOpsConstructor]]).
    + */
    +private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: VertexPartitionBase[X]]
    +    (self: T[VD])
    +    (implicit ev: VertexPartitionBaseOpsConstructor[T])
    +  extends Logging {
    +
    +  def withIndex(index: VertexIdToIndexMap): T[VD]
    +  def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2]
    +  def withMask(mask: BitSet): T[VD]
    +
    +  /**
    +   * Pass each vertex attribute along with the vertex id through a map
    +   * function and retain the original RDD's partitioning and index.
    +   *
    +   * @tparam VD2 the type returned by the map function
    +   *
    +   * @param f the function applied to each vertex id and vertex
    +   * attribute in the RDD
    +   *
    +   * @return a new VertexPartition with values obtained by applying `f` to
    +   * each of the entries in the original VertexRDD.  The resulting
    +   * VertexPartition retains the same index.
    +   */
    +  def map[VD2: ClassTag](f: (VertexId, VD) => VD2): T[VD2] = {
    +    // Construct a view of the map transformation
    +    val newValues = new Array[VD2](self.capacity)
    +    var i = self.mask.nextSetBit(0)
    +    while (i >= 0) {
    +      newValues(i) = f(self.index.getValue(i), self.values(i))
    +      i = self.mask.nextSetBit(i + 1)
    +    }
    +    this.withValues(newValues)
    +  }
    +
    +  /**
    +   * Restrict the vertex set to the set of vertices satisfying the given predicate.
    +   *
    +   * @param pred the user defined predicate
    +   *
    +   * @note The vertex set preserves the original index structure which means that the returned
    +   *       RDD can be easily joined with the original vertex-set. Furthermore, the filter only
    +   *       modifies the bitmap index and so no new values are allocated.
    +   */
    +  def filter(pred: (VertexId, VD) => Boolean): T[VD] = {
    +    // Allocate the array to store the results into
    +    val newMask = new BitSet(self.capacity)
    +    // Iterate over the active bits in the old mask and evaluate the predicate
    +    var i = self.mask.nextSetBit(0)
    +    while (i >= 0) {
    +      if (pred(self.index.getValue(i), self.values(i))) {
    +        newMask.set(i)
    +      }
    +      i = self.mask.nextSetBit(i + 1)
    +    }
    +    this.withMask(newMask)
    +  }
    +
    +  /**
    +   * Hides vertices that are the same between this and other. For vertices that are different, keeps
    +   * the values from `other`. The indices of `this` and `other` must be the same.
    +   */
    +  def diff(other: T[VD]): T[VD] = {
    +    if (self.index != other.index) {
    +      logWarning("Diffing two VertexPartitions with different indexes is slow.")
    +      diff(createUsingIndex(other.iterator))
    +    } else {
    +      val newMask = self.mask & other.mask
    +      var i = newMask.nextSetBit(0)
    +      while (i >= 0) {
    +        if (self.values(i) == other.values(i)) {
    +          newMask.unset(i)
    +        }
    +        i = newMask.nextSetBit(i + 1)
    +      }
    +      ev.toOps(this.withValues(other.values)).withMask(newMask)
    +    }
    +  }
    +
    +  /** Left outer join another VertexPartition. */
    +  def leftJoin[VD2: ClassTag, VD3: ClassTag]
    +      (other: T[VD2])
    +      (f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = {
    +    if (self.index != other.index) {
    +      logWarning("Joining two VertexPartitions with different indexes is slow.")
    +      leftJoin(createUsingIndex(other.iterator))(f)
    +    } else {
    +      val newValues = new Array[VD3](self.capacity)
    +
    +      var i = self.mask.nextSetBit(0)
    +      while (i >= 0) {
    +        val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
    +        newValues(i) = f(self.index.getValue(i), self.values(i), otherV)
    +        i = self.mask.nextSetBit(i + 1)
    +      }
    +      this.withValues(newValues)
    +    }
    +  }
    +
    +  /** Left outer join another iterator of messages. */
    +  def leftJoin[VD2: ClassTag, VD3: ClassTag]
    +      (other: Iterator[(VertexId, VD2)])
    +      (f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = {
    +    leftJoin(createUsingIndex(other))(f)
    +  }
    +
    +  /** Inner join another VertexPartition. */
    +  def innerJoin[U: ClassTag, VD2: ClassTag]
    +      (other: T[U])
    +      (f: (VertexId, VD, U) => VD2): T[VD2] = {
    +    if (self.index != other.index) {
    +      logWarning("Joining two VertexPartitions with different indexes is slow.")
    +      innerJoin(createUsingIndex(other.iterator))(f)
    +    } else {
    +      val newMask = self.mask & other.mask
    +      val newValues = new Array[VD2](self.capacity)
    +      var i = newMask.nextSetBit(0)
    +      while (i >= 0) {
    +        newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i))
    +        i = newMask.nextSetBit(i + 1)
    +      }
    +      ev.toOps(this.withValues(newValues)).withMask(newMask)
    +    }
    +  }
    +
    +  /**
    +   * Inner join an iterator of messages.
    +   */
    +  def innerJoin[U: ClassTag, VD2: ClassTag]
    +      (iter: Iterator[Product2[VertexId, U]])
    +      (f: (VertexId, VD, U) => VD2): T[VD2] = {
    +    innerJoin(createUsingIndex(iter))(f)
    +  }
    +
    +  /**
    +   * Similar effect as aggregateUsingIndex((a, b) => a)
    +   */
    +  def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
    +    : T[VD2] = {
    +    val newMask = new BitSet(self.capacity)
    +    val newValues = new Array[VD2](self.capacity)
    +    iter.foreach { case (vid, vdata) =>
    --- End diff --
    
    Ditto on the foreach with case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42753687
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42351663
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12341175
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala ---
    @@ -0,0 +1,237 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.language.higherKinds
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.util.collection.BitSet
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +/**
    + * An class containing additional operations for subclasses of VertexPartitionBase that provide
    + * implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for
    + * example, [[VertexPartition.VertexPartitionOpsConstructor]]).
    + */
    +private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: VertexPartitionBase[X]]
    +    (self: T[VD])
    +    (implicit ev: VertexPartitionBaseOpsConstructor[T])
    +  extends Logging {
    +
    +  def withIndex(index: VertexIdToIndexMap): T[VD]
    +  def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2]
    +  def withMask(mask: BitSet): T[VD]
    +
    +  /**
    +   * Pass each vertex attribute along with the vertex id through a map
    +   * function and retain the original RDD's partitioning and index.
    +   *
    +   * @tparam VD2 the type returned by the map function
    +   *
    +   * @param f the function applied to each vertex id and vertex
    +   * attribute in the RDD
    +   *
    +   * @return a new VertexPartition with values obtained by applying `f` to
    +   * each of the entries in the original VertexRDD.  The resulting
    +   * VertexPartition retains the same index.
    +   */
    +  def map[VD2: ClassTag](f: (VertexId, VD) => VD2): T[VD2] = {
    +    // Construct a view of the map transformation
    +    val newValues = new Array[VD2](self.capacity)
    +    var i = self.mask.nextSetBit(0)
    +    while (i >= 0) {
    +      newValues(i) = f(self.index.getValue(i), self.values(i))
    +      i = self.mask.nextSetBit(i + 1)
    +    }
    +    this.withValues(newValues)
    +  }
    +
    +  /**
    +   * Restrict the vertex set to the set of vertices satisfying the given predicate.
    +   *
    +   * @param pred the user defined predicate
    +   *
    +   * @note The vertex set preserves the original index structure which means that the returned
    +   *       RDD can be easily joined with the original vertex-set. Furthermore, the filter only
    +   *       modifies the bitmap index and so no new values are allocated.
    +   */
    +  def filter(pred: (VertexId, VD) => Boolean): T[VD] = {
    +    // Allocate the array to store the results into
    +    val newMask = new BitSet(self.capacity)
    +    // Iterate over the active bits in the old mask and evaluate the predicate
    +    var i = self.mask.nextSetBit(0)
    +    while (i >= 0) {
    +      if (pred(self.index.getValue(i), self.values(i))) {
    +        newMask.set(i)
    +      }
    +      i = self.mask.nextSetBit(i + 1)
    +    }
    +    this.withMask(newMask)
    +  }
    +
    +  /**
    +   * Hides vertices that are the same between this and other. For vertices that are different, keeps
    +   * the values from `other`. The indices of `this` and `other` must be the same.
    +   */
    +  def diff(other: T[VD]): T[VD] = {
    +    if (self.index != other.index) {
    +      logWarning("Diffing two VertexPartitions with different indexes is slow.")
    +      diff(createUsingIndex(other.iterator))
    +    } else {
    +      val newMask = self.mask & other.mask
    +      var i = newMask.nextSetBit(0)
    +      while (i >= 0) {
    +        if (self.values(i) == other.values(i)) {
    +          newMask.unset(i)
    +        }
    +        i = newMask.nextSetBit(i + 1)
    +      }
    +      ev.toOps(this.withValues(other.values)).withMask(newMask)
    +    }
    +  }
    +
    +  /** Left outer join another VertexPartition. */
    +  def leftJoin[VD2: ClassTag, VD3: ClassTag]
    +      (other: T[VD2])
    +      (f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = {
    +    if (self.index != other.index) {
    +      logWarning("Joining two VertexPartitions with different indexes is slow.")
    +      leftJoin(createUsingIndex(other.iterator))(f)
    +    } else {
    +      val newValues = new Array[VD3](self.capacity)
    +
    +      var i = self.mask.nextSetBit(0)
    +      while (i >= 0) {
    +        val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
    +        newValues(i) = f(self.index.getValue(i), self.values(i), otherV)
    +        i = self.mask.nextSetBit(i + 1)
    +      }
    +      this.withValues(newValues)
    +    }
    +  }
    +
    +  /** Left outer join another iterator of messages. */
    +  def leftJoin[VD2: ClassTag, VD3: ClassTag]
    +      (other: Iterator[(VertexId, VD2)])
    +      (f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = {
    +    leftJoin(createUsingIndex(other))(f)
    +  }
    +
    +  /** Inner join another VertexPartition. */
    +  def innerJoin[U: ClassTag, VD2: ClassTag]
    +      (other: T[U])
    +      (f: (VertexId, VD, U) => VD2): T[VD2] = {
    +    if (self.index != other.index) {
    +      logWarning("Joining two VertexPartitions with different indexes is slow.")
    +      innerJoin(createUsingIndex(other.iterator))(f)
    +    } else {
    +      val newMask = self.mask & other.mask
    +      val newValues = new Array[VD2](self.capacity)
    +      var i = newMask.nextSetBit(0)
    +      while (i >= 0) {
    +        newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i))
    +        i = newMask.nextSetBit(i + 1)
    +      }
    +      ev.toOps(this.withValues(newValues)).withMask(newMask)
    +    }
    +  }
    +
    +  /**
    +   * Inner join an iterator of messages.
    +   */
    +  def innerJoin[U: ClassTag, VD2: ClassTag]
    +      (iter: Iterator[Product2[VertexId, U]])
    +      (f: (VertexId, VD, U) => VD2): T[VD2] = {
    +    innerJoin(createUsingIndex(iter))(f)
    +  }
    +
    +  /**
    +   * Similar effect as aggregateUsingIndex((a, b) => a)
    +   */
    +  def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
    +    : T[VD2] = {
    +    val newMask = new BitSet(self.capacity)
    +    val newValues = new Array[VD2](self.capacity)
    +    iter.foreach { case (vid, vdata) =>
    +      val pos = self.index.getPos(vid)
    +      if (pos >= 0) {
    +        newMask.set(pos)
    +        newValues(pos) = vdata
    +      }
    +    }
    +    ev.toOps(this.withValues(newValues)).withMask(newMask)
    +  }
    +
    +  /**
    +   * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
    +   * the partition, hidden by the bitmask.
    +   */
    +  def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): T[VD] = {
    +    val newMask = new BitSet(self.capacity)
    +    val newValues = new Array[VD](self.capacity)
    +    System.arraycopy(self.values, 0, newValues, 0, newValues.length)
    +    iter.foreach { case (vid, vdata) =>
    --- End diff --
    
    `foreach` with `case` is actually pretty slow in Scala, as it does an isInstanceOf and other funny stuff to do pattern matching inside. It would be better to do `iter.foreach { p => ...` and use `p._1` and `p._2`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41124613
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42489551
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14786/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42391060
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42754281
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12341289
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.language.higherKinds
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.util.collection.BitSet
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +private[graphx] object VertexPartitionBase {
    +  /**
    +   * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
    +   * entries arbitrarily.
    +   */
    +  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
    +    : (VertexIdToIndexMap, Array[VD], BitSet) = {
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
    +    iter.foreach { case (k, v) =>
    --- End diff --
    
    foreach with case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41124574
  
    cc @rxin @jegonzal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12456652
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.Partitioner
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rdd.ShuffledRDD
    +import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +/**
    + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
    + * the edge partition references `vid` in the specified `position` (src, dst, or both).
    +*/
    +private[graphx]
    +class RoutingTableMessage(
    +    var vid: VertexId,
    +    var pid: PartitionID,
    +    var position: Byte)
    +  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
    +  override def _1 = vid
    +  override def _2 = (pid, position)
    +  override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
    +}
    +
    +private[graphx]
    +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
    +  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
    +  def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
    +    new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
    +      .setSerializer(new RoutingTableMessageSerializer)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTableMessageRDDFunctions {
    +  import scala.language.implicitConversions
    +
    +  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
    +    new RoutingTableMessageRDDFunctions(rdd)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTablePartition {
    +  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
    +
    +  /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
    +  def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
    +    : Iterator[RoutingTableMessage] = {
    +    // Determine which positions each vertex id appears in using a map where the low 2 bits
    +    // represent src and dst
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
    +    edgePartition.srcIds.iterator.foreach { srcId =>
    +      map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
    +    }
    +    edgePartition.dstIds.iterator.foreach { dstId =>
    +      map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
    +    }
    +    map.iterator.map { vidAndPosition =>
    +      new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
    +    }
    +  }
    +
    +  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
    +  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
    +    : RoutingTablePartition = {
    +    val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
    +    val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    --- End diff --
    
    Use Scala BitSet, which is auto-resizing, avoiding the copy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42491311
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14787/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12455215
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala ---
    @@ -89,105 +81,79 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
         }
           .partitionBy(new HashPartitioner(numPartitions))
           .mapPartitionsWithIndex( { (pid, iter) =>
    -        val builder = new EdgePartitionBuilder[ED]()(edTag)
    +        val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
             iter.foreach { message =>
               val data = message.data
               builder.add(data._1, data._2, data._3)
             }
             val edgePartition = builder.toEdgePartition
             Iterator((pid, edgePartition))
    -      }, preservesPartitioning = true).cache())
    -    GraphImpl(vertices, newEdges)
    +      }, preservesPartitioning = true))
    +    GraphImpl.fromExistingRDDs(vertices, newEdges)
       }
     
       override def reverse: Graph[VD, ED] = {
    -    val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
    -    GraphImpl(vertices, newETable)
    +    new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
       }
     
       override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
    --- End diff --
    
    Introduce private mapVerticesDelta to work around SPARK-1552


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42480525
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41126494
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14356/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42480505
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12453687
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala ---
    @@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag](
        */
       def aggregateUsingIndex[VD2: ClassTag](
           messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
    -    val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
    +    val shuffled = messages.copartitionWithVertices(this.partitioner.get)
         val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
    -      val vertexPartition: VertexPartition[VD] = thisIter.next()
    -      Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
    +      thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
         }
         new VertexRDD[VD2](parts)
       }
     
    +  /**
    +   * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
    +   * [[EdgeRDD]].
    +   */
    +  def reverseRoutingTables(): VertexRDD[VD] =
    --- End diff --
    
    Should be private[graphx]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42484874
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14783/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42483855
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42487477
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41124706
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41126018
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42755276
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41125076
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42755277
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14874/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12456744
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.Partitioner
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rdd.ShuffledRDD
    +import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +/**
    + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
    + * the edge partition references `vid` in the specified `position` (src, dst, or both).
    +*/
    +private[graphx]
    +class RoutingTableMessage(
    +    var vid: VertexId,
    +    var pid: PartitionID,
    +    var position: Byte)
    +  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
    +  override def _1 = vid
    +  override def _2 = (pid, position)
    +  override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
    +}
    +
    +private[graphx]
    +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
    +  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
    +  def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
    +    new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
    +      .setSerializer(new RoutingTableMessageSerializer)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTableMessageRDDFunctions {
    +  import scala.language.implicitConversions
    +
    +  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
    +    new RoutingTableMessageRDDFunctions(rdd)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTablePartition {
    +  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
    +
    +  /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
    +  def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
    +    : Iterator[RoutingTableMessage] = {
    +    // Determine which positions each vertex id appears in using a map where the low 2 bits
    +    // represent src and dst
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
    +    edgePartition.srcIds.iterator.foreach { srcId =>
    +      map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
    +    }
    +    edgePartition.dstIds.iterator.foreach { dstId =>
    +      map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
    +    }
    +    map.iterator.map { vidAndPosition =>
    +      new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
    +    }
    +  }
    +
    +  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
    +  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
    +    : RoutingTablePartition = {
    +    val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
    +    val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    +    val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    +    for (msg <- iter) {
    +      pid2vid(msg.pid) += msg.vid
    +      srcFlags(msg.pid) += (msg.position & 0x1) != 0
    +      dstFlags(msg.pid) += (msg.position & 0x2) != 0
    +    }
    +
    +    new RoutingTablePartition(pid2vid.zipWithIndex.map {
    +      case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
    +    })
    +  }
    +
    +  /** Compact the given vector of Booleans into a BitSet. */
    +  private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = {
    +    val bitset = new BitSet(flags.size)
    +    var i = 0
    +    while (i < flags.size) {
    +      if (flags(i)) {
    +        bitset.set(i)
    +      }
    +      i += 1
    +    }
    +    bitset
    +  }
    +}
    +
    +/**
    + * Stores the locations of edge-partition join sites for each vertex attribute in a particular
    + * vertex partition. This provides routing information for shipping vertex attributes to edge
    + * partitions.
    + */
    +private[graphx]
    +class RoutingTablePartition(
    +    private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) {
    +  /** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */
    +  val numEdgePartitions: Int = routingTable.size
    +
    +  /** Returns the number of vertices that will be sent to the specified edge partition. */
    +  def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.size
    +
    +  /** Returns an iterator over all vertex ids stored in this `RoutingTablePartition`. */
    +  def iterator: Iterator[VertexId] = routingTable.iterator.flatMap(_._1.iterator)
    +
    +  /** Returns a new RoutingTablePartition reflecting a reversal of all edge directions. */
    +  def reverse: RoutingTablePartition = {
    +    new RoutingTablePartition(routingTable.map {
    +      case (vids, srcVids, dstVids) => (vids, dstVids, srcVids)
    +    })
    +  }
    +
    +  /**
    +   * Runs `f` on each vertex id to be sent to the specified edge partition. Vertex ids can be
    +   * filtered by the position they have in the edge partition.
    +   */
    +  def foreachWithinEdgePartition
    +      (pid: PartitionID, includeSrc: Boolean, includeDst: Boolean)
    +      (f: VertexId => Unit) {
    +    val (vidsCandidate, srcVids, dstVids) = routingTable(pid)
    +    val size = vidsCandidate.length
    +    if (includeSrc && includeDst) {
    +      // Avoid checks for performance
    +      vidsCandidate.iterator.foreach(f)
    +    } else if (!includeSrc && !includeDst) {
    +      // Do nothing
    +    } else {
    +      val relevantVids = if (includeSrc) srcVids else dstVids
    +      relevantVids.iterator.foreach { i => f(vidsCandidate(i)) }
    --- End diff --
    
    Switch to while


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41127456
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42491309
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42351668
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14726/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12454560
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
    @@ -100,7 +147,23 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
           i += 1
         }
         assert(newData.size == i)
    -    new EdgePartition(srcIds, dstIds, newData, index)
    +    this.withData(newData)
    +  }
    +
    +  /**
    +   * Construct a new edge partition containing only the edges matching `epred` and where both
    +   * vertices match `vpred`.
    +   */
    +  def filter(
    --- End diff --
    
    TODO: filter the clustered using `vpred` first


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12456795
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala ---
    @@ -26,6 +26,35 @@ import org.apache.spark.serializer._
     import scala.language.existentials
     
     private[graphx]
    +class RoutingTableMessageSerializer extends Serializer with Serializable {
    +  override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
    +
    +    override def serializeStream(s: OutputStream): SerializationStream =
    +      new ShuffleSerializationStream(s) {
    +        def writeObject[T](t: T): SerializationStream = {
    +          val msg = t.asInstanceOf[RoutingTableMessage]
    +          writeVarLong(msg.vid, optimizePositive = false)
    +          writeUnsignedVarInt(msg.pid)
    +          // TODO: Write only the bottom two bits of msg.position
    --- End diff --
    
    Remove this TODO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12456738
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.Partitioner
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rdd.ShuffledRDD
    +import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +/**
    + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
    + * the edge partition references `vid` in the specified `position` (src, dst, or both).
    +*/
    +private[graphx]
    +class RoutingTableMessage(
    +    var vid: VertexId,
    +    var pid: PartitionID,
    +    var position: Byte)
    +  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
    +  override def _1 = vid
    +  override def _2 = (pid, position)
    +  override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
    +}
    +
    +private[graphx]
    +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
    +  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
    +  def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
    +    new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
    +      .setSerializer(new RoutingTableMessageSerializer)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTableMessageRDDFunctions {
    +  import scala.language.implicitConversions
    +
    +  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
    +    new RoutingTableMessageRDDFunctions(rdd)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTablePartition {
    +  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
    +
    +  /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
    +  def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
    +    : Iterator[RoutingTableMessage] = {
    +    // Determine which positions each vertex id appears in using a map where the low 2 bits
    +    // represent src and dst
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
    +    edgePartition.srcIds.iterator.foreach { srcId =>
    +      map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
    +    }
    +    edgePartition.dstIds.iterator.foreach { dstId =>
    +      map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
    +    }
    +    map.iterator.map { vidAndPosition =>
    +      new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
    +    }
    +  }
    +
    +  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
    +  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
    +    : RoutingTablePartition = {
    +    val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
    +    val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    +    val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    +    for (msg <- iter) {
    +      pid2vid(msg.pid) += msg.vid
    +      srcFlags(msg.pid) += (msg.position & 0x1) != 0
    +      dstFlags(msg.pid) += (msg.position & 0x2) != 0
    +    }
    +
    +    new RoutingTablePartition(pid2vid.zipWithIndex.map {
    +      case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
    +    })
    +  }
    +
    +  /** Compact the given vector of Booleans into a BitSet. */
    +  private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = {
    +    val bitset = new BitSet(flags.size)
    +    var i = 0
    +    while (i < flags.size) {
    +      if (flags(i)) {
    +        bitset.set(i)
    +      }
    +      i += 1
    +    }
    +    bitset
    +  }
    +}
    +
    +/**
    + * Stores the locations of edge-partition join sites for each vertex attribute in a particular
    + * vertex partition. This provides routing information for shipping vertex attributes to edge
    + * partitions.
    + */
    +private[graphx]
    +class RoutingTablePartition(
    +    private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) {
    +  /** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */
    +  val numEdgePartitions: Int = routingTable.size
    +
    +  /** Returns the number of vertices that will be sent to the specified edge partition. */
    +  def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.size
    +
    +  /** Returns an iterator over all vertex ids stored in this `RoutingTablePartition`. */
    +  def iterator: Iterator[VertexId] = routingTable.iterator.flatMap(_._1.iterator)
    +
    +  /** Returns a new RoutingTablePartition reflecting a reversal of all edge directions. */
    +  def reverse: RoutingTablePartition = {
    +    new RoutingTablePartition(routingTable.map {
    +      case (vids, srcVids, dstVids) => (vids, dstVids, srcVids)
    +    })
    +  }
    +
    +  /**
    +   * Runs `f` on each vertex id to be sent to the specified edge partition. Vertex ids can be
    +   * filtered by the position they have in the edge partition.
    +   */
    +  def foreachWithinEdgePartition
    +      (pid: PartitionID, includeSrc: Boolean, includeDst: Boolean)
    +      (f: VertexId => Unit) {
    +    val (vidsCandidate, srcVids, dstVids) = routingTable(pid)
    +    val size = vidsCandidate.length
    +    if (includeSrc && includeDst) {
    +      // Avoid checks for performance
    +      vidsCandidate.iterator.foreach(f)
    --- End diff --
    
    Switch to while


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42453506
  
    @ankurdave this needs to be up-merged with master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/497


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42453595
  
    @ankurdave also - do you mind writing a brief "upgrade guide" in the GraphX docs with the major user-facing interface changes from 0.9?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42753649
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42483761
  
    @pwendell Merged and added an upgrade section to the GraphX programming guide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12454050
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
    @@ -100,7 +147,23 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
           i += 1
         }
         assert(newData.size == i)
    -    new EdgePartition(srcIds, dstIds, newData, index)
    +    this.withData(newData)
    +  }
    +
    +  /**
    +   * Construct a new edge partition containing only the edges matching `epred` and where both
    +   * vertices match `vpred`.
    --- End diff --
    
    Assumes the EdgePartition is upgraded to full triplets


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12456334
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.Partitioner
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rdd.ShuffledRDD
    +import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +/**
    + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
    + * the edge partition references `vid` in the specified `position` (src, dst, or both).
    +*/
    +private[graphx]
    +class RoutingTableMessage(
    +    var vid: VertexId,
    +    var pid: PartitionID,
    +    var position: Byte)
    +  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
    +  override def _1 = vid
    +  override def _2 = (pid, position)
    +  override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
    +}
    +
    +private[graphx]
    +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
    +  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
    +  def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
    +    new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
    +      .setSerializer(new RoutingTableMessageSerializer)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTableMessageRDDFunctions {
    +  import scala.language.implicitConversions
    +
    +  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
    +    new RoutingTableMessageRDDFunctions(rdd)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTablePartition {
    +  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
    +
    +  /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
    +  def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
    +    : Iterator[RoutingTableMessage] = {
    +    // Determine which positions each vertex id appears in using a map where the low 2 bits
    +    // represent src and dst
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
    +    edgePartition.srcIds.iterator.foreach { srcId =>
    +      map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
    +    }
    +    edgePartition.dstIds.iterator.foreach { dstId =>
    +      map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
    +    }
    +    map.iterator.map { vidAndPosition =>
    +      new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
    +    }
    +  }
    +
    +  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
    +  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
    +    : RoutingTablePartition = {
    +    val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
    +    val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    --- End diff --
    
    Merge srcFlags and dstFlags into a PrimitiveVector[Byte]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42391053
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12340674
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala ---
    @@ -26,6 +26,33 @@ import org.apache.spark.serializer._
     import scala.language.existentials
     
     private[graphx]
    +class RoutingTableMessageSerializer extends Serializer with Serializable {
    +  override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
    +
    +    override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
    +      def writeObject[T](t: T) = {
    +        val msg = t.asInstanceOf[RoutingTableMessage]
    +        writeVarLong(msg.vid, optimizePositive = false)
    +        writeUnsignedVarInt(msg.pid)
    +        // TODO: Write only the bottom two bits of msg.position
    +        s.write(msg.position)
    +        this
    +      }
    +    }
    +
    +    override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
    --- End diff --
    
    Add an explicit return type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42753772
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42487496
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12341328
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.language.higherKinds
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.util.collection.BitSet
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +private[graphx] object VertexPartitionBase {
    +  /**
    +   * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
    +   * entries arbitrarily.
    +   */
    +  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
    +    : (VertexIdToIndexMap, Array[VD], BitSet) = {
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
    +    iter.foreach { case (k, v) =>
    +      map(k) = v
    +    }
    +    (map.keySet, map._values, map.keySet.getBitSet)
    +  }
    +
    +  /**
    +   * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
    +   * entries using `mergeFunc`.
    +   */
    +  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
    +    : (VertexIdToIndexMap, Array[VD], BitSet) = {
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
    +    iter.foreach { case (k, v) =>
    --- End diff --
    
    foreach with case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by ankurdave <gi...@git.apache.org>.
Github user ankurdave commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12456417
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.graphx.impl
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.Partitioner
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rdd.ShuffledRDD
    +import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
    +
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
    +
    +/**
    + * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
    + * the edge partition references `vid` in the specified `position` (src, dst, or both).
    +*/
    +private[graphx]
    +class RoutingTableMessage(
    +    var vid: VertexId,
    +    var pid: PartitionID,
    +    var position: Byte)
    +  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
    +  override def _1 = vid
    +  override def _2 = (pid, position)
    +  override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
    +}
    +
    +private[graphx]
    +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
    +  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
    +  def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
    +    new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
    +      .setSerializer(new RoutingTableMessageSerializer)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTableMessageRDDFunctions {
    +  import scala.language.implicitConversions
    +
    +  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
    +    new RoutingTableMessageRDDFunctions(rdd)
    +  }
    +}
    +
    +private[graphx]
    +object RoutingTablePartition {
    +  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
    +
    +  /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
    +  def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
    +    : Iterator[RoutingTableMessage] = {
    +    // Determine which positions each vertex id appears in using a map where the low 2 bits
    +    // represent src and dst
    +    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
    +    edgePartition.srcIds.iterator.foreach { srcId =>
    +      map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
    +    }
    +    edgePartition.dstIds.iterator.foreach { dstId =>
    +      map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
    +    }
    +    map.iterator.map { vidAndPosition =>
    +      new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
    +    }
    +  }
    +
    +  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
    +  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
    +    : RoutingTablePartition = {
    +    val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
    +    val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    --- End diff --
    
    @rxin What is the reason to use PrimitiveVector over ArrayBuilder? The latter specializes for all primitives, including Bytes: https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/mutable/ArrayBuilder.scala#L40


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by jegonzal <gi...@git.apache.org>.
Github user jegonzal commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42618546
  
    I went through this PR with Ankur and it looks good to me.  There are a few minor changes but those can be moved to a second PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42340917
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42484873
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-41126010
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42340645
  
    Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/497#discussion_r12340669
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala ---
    @@ -26,6 +26,33 @@ import org.apache.spark.serializer._
     import scala.language.existentials
     
     private[graphx]
    +class RoutingTableMessageSerializer extends Serializer with Serializable {
    +  override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
    +
    +    override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
    --- End diff --
    
    Add an explicit return type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Unify GraphImpl RDDs + other graph load optimi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/497#issuecomment-42394655
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---