You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/11/17 20:06:44 UTC

spark git commit: [SPARK-4444] Drop VD type parameter from EdgeRDD

Repository: spark
Updated Branches:
  refs/heads/master e7690ed20 -> 9ac2bb18e


[SPARK-4444] Drop VD type parameter from EdgeRDD

Due to vertex attribute caching, EdgeRDD previously took two type parameters: ED and VD. However, this is an implementation detail that should not be exposed in the interface, so this PR drops the VD type parameter.

This requires removing the `filter` method from the EdgeRDD interface, because it depends on vertex attribute caching.

Author: Ankur Dave <an...@gmail.com>

Closes #3303 from ankurdave/edgerdd-drop-tparam and squashes the following commits:

38dca9b [Ankur Dave] Leave EdgeRDD.fromEdges public
fafeb51 [Ankur Dave] Drop VD type parameter from EdgeRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ac2bb18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ac2bb18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ac2bb18

Branch: refs/heads/master
Commit: 9ac2bb18ede2e9f73c255fa33445af89aaf8a000
Parents: e7690ed
Author: Ankur Dave <an...@gmail.com>
Authored: Mon Nov 17 11:06:31 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Nov 17 11:06:31 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/EdgeRDD.scala | 35 +++++++-------------
 .../scala/org/apache/spark/graphx/Graph.scala   |  2 +-
 .../org/apache/spark/graphx/VertexRDD.scala     | 10 +++---
 .../apache/spark/graphx/impl/EdgeRDDImpl.scala  | 24 +++++++-------
 .../apache/spark/graphx/impl/GraphImpl.scala    | 13 ++++----
 .../graphx/impl/ReplicatedVertexView.scala      |  4 +--
 .../spark/graphx/impl/VertexRDDImpl.scala       |  2 +-
 7 files changed, 40 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ac2bb18/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 869ef15..cc70b39 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.graphx
 
+import scala.language.existentials
 import scala.reflect.ClassTag
 
 import org.apache.spark.Dependency
@@ -36,16 +37,16 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl
  * edge to provide the triplet view. Shipping of the vertex attributes is managed by
  * `impl.ReplicatedVertexView`.
  */
-abstract class EdgeRDD[ED, VD](
+abstract class EdgeRDD[ED](
     @transient sc: SparkContext,
     @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
 
-  private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]
+  private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] forSome { type VD }
 
   override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
 
   override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
-    val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
+    val p = firstParent[(PartitionID, EdgePartition[ED, _])].iterator(part, context)
     if (p.hasNext) {
       p.next._2.iterator.map(_.copy())
     } else {
@@ -60,19 +61,14 @@ abstract class EdgeRDD[ED, VD](
    * @param f the function from an edge to a new edge value
    * @return a new EdgeRDD containing the new edge values
    */
-  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
+  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
 
   /**
    * Reverse all the edges in this RDD.
    *
    * @return a new EdgeRDD containing all the edges reversed
    */
-  def reverse: EdgeRDD[ED, VD]
-
-  /** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
-  def filter(
-      epred: EdgeTriplet[VD, ED] => Boolean,
-      vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD]
+  def reverse: EdgeRDD[ED]
 
   /**
    * Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -84,15 +80,8 @@ abstract class EdgeRDD[ED, VD](
    *         with values supplied by `f`
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
-      (other: EdgeRDD[ED2, _])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
-
-  private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
-      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2]
-
-  /** Replaces the edge partitions while preserving all other properties of the EdgeRDD. */
-  private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
-      partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2]
+      (other: EdgeRDD[ED2])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
 
   /**
    * Changes the target storage level while preserving all other properties of the
@@ -101,7 +90,7 @@ abstract class EdgeRDD[ED, VD](
    * This does not actually trigger a cache; to do this, call
    * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
    */
-  private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED, VD]
+  private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED]
 }
 
 object EdgeRDD {
@@ -111,7 +100,7 @@ object EdgeRDD {
    * @tparam ED the edge attribute type
    * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
    */
-  def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
+  def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
     val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
       val builder = new EdgePartitionBuilder[ED, VD]
       iter.foreach { e =>
@@ -128,8 +117,8 @@ object EdgeRDD {
    * @tparam ED the edge attribute type
    * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
    */
-  def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
-      edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
+  private[graphx] def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
+      edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDDImpl[ED, VD] = {
     new EdgeRDDImpl(edgePartitions)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9ac2bb18/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 2c1b951..6377915 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * along with their vertex data.
    *
    */
-  @transient val edges: EdgeRDD[ED, VD]
+  @transient val edges: EdgeRDD[ED]
 
   /**
    * An RDD containing the edge triplets, which are edges along with the vertex data associated with

http://git-wip-us.apache.org/repos/asf/spark/blob/9ac2bb18/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index f8be176..1db3df0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -207,7 +207,7 @@ abstract class VertexRDD[VD](
   def reverseRoutingTables(): VertexRDD[VD]
 
   /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
-  def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD]
+  def withEdges(edges: EdgeRDD[_]): VertexRDD[VD]
 
   /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
   private[graphx] def withPartitionsRDD[VD2: ClassTag](
@@ -269,7 +269,7 @@ object VertexRDD {
    * @param defaultVal the vertex attribute to use when creating missing vertices
    */
   def apply[VD: ClassTag](
-      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
+      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_], defaultVal: VD): VertexRDD[VD] = {
     VertexRDD(vertices, edges, defaultVal, (a, b) => a)
   }
 
@@ -286,7 +286,7 @@ object VertexRDD {
    * @param mergeFunc the commutative, associative duplicate vertex attribute merge function
    */
   def apply[VD: ClassTag](
-      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
+      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_], defaultVal: VD, mergeFunc: (VD, VD) => VD
     ): VertexRDD[VD] = {
     val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
       case Some(p) => vertices
@@ -314,7 +314,7 @@ object VertexRDD {
    * @param defaultVal the vertex attribute to use when creating missing vertices
    */
   def fromEdges[VD: ClassTag](
-      edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
+      edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
     val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
     val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
       val routingTable =
@@ -325,7 +325,7 @@ object VertexRDD {
   }
 
   private[graphx] def createRoutingTables(
-      edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
+      edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
     // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
     val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
       Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))

http://git-wip-us.apache.org/repos/asf/spark/blob/9ac2bb18/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 4100a85..a816961 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -28,7 +28,7 @@ import org.apache.spark.graphx._
 class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
     override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
     val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
-  extends EdgeRDD[ED, VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+  extends EdgeRDD[ED](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
 
   override def setName(_name: String): this.type = {
     if (partitionsRDD.name != null) {
@@ -75,20 +75,20 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
     partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
   }
 
-  override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
+  override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] =
     mapEdgePartitions((pid, part) => part.map(f))
 
-  override def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+  override def reverse: EdgeRDDImpl[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
 
-  override def filter(
+  def filter(
       epred: EdgeTriplet[VD, ED] => Boolean,
-      vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
+      vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = {
     mapEdgePartitions((pid, part) => part.filter(epred, vpred))
   }
 
   override def innerJoin[ED2: ClassTag, ED3: ClassTag]
-      (other: EdgeRDD[ED2, _])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
+      (other: EdgeRDD[ED2])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDDImpl[ED3, VD] = {
     val ed2Tag = classTag[ED2]
     val ed3Tag = classTag[ED3]
     this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
@@ -99,8 +99,8 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
     })
   }
 
-  override private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
-      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
+  def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDDImpl[ED2, VD2] = {
     this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
       if (iter.hasNext) {
         val (pid, ep) = iter.next()
@@ -111,13 +111,13 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
     }, preservesPartitioning = true))
   }
 
-  override private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
-      partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
+  private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
+      partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDDImpl[ED2, VD2] = {
     new EdgeRDDImpl(partitionsRDD, this.targetStorageLevel)
   }
 
   override private[graphx] def withTargetStorageLevel(
-      targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
+      targetStorageLevel: StorageLevel): EdgeRDDImpl[ED, VD] = {
     new EdgeRDDImpl(this.partitionsRDD, targetStorageLevel)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9ac2bb18/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 2b4636a..0eae2a6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -43,7 +43,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   /** Default constructor is provided to support serialization */
   protected def this() = this(null, null)
 
-  @transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges
+  @transient override val edges: EdgeRDDImpl[ED, VD] = replicatedVertexView.edges
 
   /** Return a RDD that brings edges together with their source and destination vertices. */
   @transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = {
@@ -323,9 +323,10 @@ object GraphImpl {
    */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
-      edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
+      edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
     // Convert the vertex partitions in edges to the correct type
-    val newEdges = edges.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
+    val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
+      .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
     GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 
@@ -336,8 +337,8 @@ object GraphImpl {
    */
   def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
-      edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = {
-    new GraphImpl(vertices, new ReplicatedVertexView(edges))
+      edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
+    new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))
   }
 
   /**
@@ -345,7 +346,7 @@ object GraphImpl {
    * `defaultVertexAttr`. The vertices will have the same number of partitions as the EdgeRDD.
    */
   private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
-      edges: EdgeRDD[ED, VD],
+      edges: EdgeRDDImpl[ED, VD],
       defaultVertexAttr: VD,
       edgeStorageLevel: StorageLevel,
       vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9ac2bb18/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 86b366e..8ab255b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -33,7 +33,7 @@ import org.apache.spark.graphx._
  */
 private[impl]
 class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
-    var edges: EdgeRDD[ED, VD],
+    var edges: EdgeRDDImpl[ED, VD],
     var hasSrcId: Boolean = false,
     var hasDstId: Boolean = false) {
 
@@ -42,7 +42,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
    * shipping level.
    */
   def withEdges[VD2: ClassTag, ED2: ClassTag](
-      edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
+      edges_ : EdgeRDDImpl[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
     new ReplicatedVertexView(edges_, hasSrcId, hasDstId)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9ac2bb18/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 0840562..d92a55a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -172,7 +172,7 @@ class VertexRDDImpl[VD] private[graphx] (
   override def reverseRoutingTables(): VertexRDD[VD] =
     this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
 
-  override def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
+  override def withEdges(edges: EdgeRDD[_]): VertexRDD[VD] = {
     val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
     val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
       (partIter, routingTableIter) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org