You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/24 07:01:16 UTC

git commit: Mark all fields of EdgePartition, Graph, and GraphOps transient

Repository: spark
Updated Branches:
  refs/heads/master d485eecb7 -> 1d6abe3a4


Mark all fields of EdgePartition, Graph, and GraphOps transient

These classes are only serializable to work around closure capture, so their fields should all be marked `@transient` to avoid wasteful serialization.

This PR supersedes apache/spark#519 and fixes the same bug.

Author: Ankur Dave <an...@gmail.com>

Closes #520 from ankurdave/graphx-transient and squashes the following commits:

6431760 [Ankur Dave] Mark all fields of EdgePartition, Graph, and GraphOps `@transient`


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d6abe3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d6abe3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d6abe3a

Branch: refs/heads/master
Commit: 1d6abe3a4b58f28fc4e0e690e02c19b2568ce1ee
Parents: d485eec
Author: Ankur Dave <an...@gmail.com>
Authored: Wed Apr 23 22:01:13 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Apr 23 22:01:13 2014 -0700

----------------------------------------------------------------------
 graphx/src/main/scala/org/apache/spark/graphx/Graph.scala |  6 +++---
 .../src/main/scala/org/apache/spark/graphx/GraphOps.scala | 10 +++++-----
 .../org/apache/spark/graphx/impl/EdgePartition.scala      |  8 ++++----
 3 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1d6abe3a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 4534969..5039586 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -46,7 +46,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * @note vertex ids are unique.
    * @return an RDD containing the vertices in this graph
    */
-  val vertices: VertexRDD[VD]
+  @transient val vertices: VertexRDD[VD]
 
   /**
    * An RDD containing the edges and their associated attributes.  The entries in the RDD contain
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * along with their vertex data.
    *
    */
-  val edges: EdgeRDD[ED]
+  @transient val edges: EdgeRDD[ED]
 
   /**
    * An RDD containing the edge triplets, which are edges along with the vertex data associated with
@@ -77,7 +77,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
    * }}}
    */
-  val triplets: RDD[EdgeTriplet[VD, ED]]
+  @transient val triplets: RDD[EdgeTriplet[VD, ED]]
 
   /**
    * Caches the vertices and edges associated with this graph at the specified storage level.

http://git-wip-us.apache.org/repos/asf/spark/blob/1d6abe3a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 5635287..4997fbc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -34,28 +34,28 @@ import scala.util.Random
 class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {
 
   /** The number of edges in the graph. */
-  lazy val numEdges: Long = graph.edges.count()
+  @transient lazy val numEdges: Long = graph.edges.count()
 
   /** The number of vertices in the graph. */
-  lazy val numVertices: Long = graph.vertices.count()
+  @transient lazy val numVertices: Long = graph.vertices.count()
 
   /**
    * The in-degree of each vertex in the graph.
    * @note Vertices with no in-edges are not returned in the resulting RDD.
    */
-  lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
+  @transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
 
   /**
    * The out-degree of each vertex in the graph.
    * @note Vertices with no out-edges are not returned in the resulting RDD.
    */
-  lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
+  @transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
 
   /**
    * The degree of each vertex in the graph.
    * @note Vertices with no edges are not returned in the resulting RDD.
    */
-  lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
+  @transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
 
   /**
    * Computes the neighboring vertex degrees.

http://git-wip-us.apache.org/repos/asf/spark/blob/1d6abe3a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 2e05f5d..b7c472e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
  */
 private[graphx]
 class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
-    val srcIds: Array[VertexId],
-    val dstIds: Array[VertexId],
-    val data: Array[ED],
-    val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
+    @transient val srcIds: Array[VertexId],
+    @transient val dstIds: Array[VertexId],
+    @transient val data: Array[ED],
+    @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
 
   /**
    * Reverse all the edges in this partition.