You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 08:00:00 UTC

[36/50] git commit: Remove Graph.statistics and GraphImpl.printLineage

Remove Graph.statistics and GraphImpl.printLineage


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d4d9ece1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d4d9ece1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d4d9ece1

Branch: refs/heads/master
Commit: d4d9ece1af258ccdc83afbb8ba26345d7af16422
Parents: ee8931d
Author: Ankur Dave <an...@gmail.com>
Authored: Mon Jan 13 18:45:46 2014 -0800
Committer: Ankur Dave <an...@gmail.com>
Committed: Mon Jan 13 21:02:37 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Graph.scala   |  7 +--
 .../apache/spark/graphx/impl/GraphImpl.scala    | 65 --------------------
 .../org/apache/spark/graphx/lib/PageRank.scala  |  6 --
 3 files changed, 1 insertion(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d4d9ece1/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 1e3f389..7e99276 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -91,11 +91,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
   def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
 
   /**
-   * Computes statistics describing the graph representation.
-   */
-  def statistics: Map[String, Any]
-
-  /**
    * Transforms each vertex attribute in the graph using the map function.
    *
    * @note The new graph has the same structure.  As a consequence the underlying index structures
@@ -254,7 +249,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
   def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
 
   /**
-   * Computes statistics about the neighboring edges and vertices of each vertex.  The user supplied
+   * Aggregates values from the neighboring edges and vertices of each vertex.  The user supplied
    * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
    * "sent" to either vertex in the edge.  The `reduceFunc` is then used to combine the output of
    * the map phase destined to each vertex.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d4d9ece1/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 348490c..12d46a8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -83,71 +83,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     GraphImpl(vertices, newEdges)
   }
 
-  override def statistics: Map[String, Any] = {
-    // Get the total number of vertices after replication, used to compute the replication ratio.
-    def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = {
-      vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
-    }
-
-    val numVertices = this.ops.numVertices
-    val numEdges = this.ops.numEdges
-    val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices
-    val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices
-    val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices
-    // One entry for each partition, indicate the total number of edges on that partition.
-    val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges)
-    val minLoad = loadArray.min
-    val maxLoad = loadArray.max
-    Map(
-      "Num Vertices" -> numVertices,
-      "Num Edges" -> numEdges,
-      "Replication (both)" -> replicationRatioBoth,
-      "Replication (src only)" -> replicationRatioSrcOnly,
-      "Replication (dest only)" -> replicationRatioDstOnly,
-      "Load Array" -> loadArray,
-      "Min Load" -> minLoad,
-      "Max Load" -> maxLoad)
-  }
-
-  /**
-   * Display the lineage information for this graph.
-   */
-  def printLineage() = {
-    def traverseLineage(
-        rdd: RDD[_],
-        indent: String = "",
-        visited: Map[Int, String] = Map.empty[Int, String]) {
-      if (visited.contains(rdd.id)) {
-        println(indent + visited(rdd.id))
-        println(indent)
-      } else {
-        val locs = rdd.partitions.map( p => rdd.preferredLocations(p) )
-        val cacheLevel = rdd.getStorageLevel
-        val name = rdd.id
-        val deps = rdd.dependencies
-        val partitioner = rdd.partitioner
-        val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0}
-        println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner +
-          ", " + numparts +")")
-        println(indent + " |--->  Deps:    " + deps.map(d => (d, d.rdd.id) ).toString)
-        println(indent + " |--->  PrefLoc: " + locs.map(x=> x.toString).mkString(", "))
-        deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
-      }
-    }
-    println("edges ------------------------------------------")
-    traverseLineage(edges, "  ")
-    var visited = Map(edges.id -> "edges")
-    println("\n\nvertices ------------------------------------------")
-    traverseLineage(vertices, "  ", visited)
-    visited += (vertices.id -> "vertices")
-    println("\n\nroutingTable.bothAttrs -------------------------------")
-    traverseLineage(routingTable.bothAttrs, "  ", visited)
-    visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs")
-    println("\n\ntriplets ----------------------------------------")
-    traverseLineage(triplets, "  ", visited)
-    println(visited)
-  } // end of printLineage
-
   override def reverse: Graph[VD, ED] = {
     val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
     new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d4d9ece1/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 08256dc..2f4d6d6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -73,9 +73,6 @@ object PageRank extends Logging {
       .mapVertices( (id, attr) => 1.0 )
       .cache()
 
-    // Display statistics about pagerank
-    logInfo(pagerankGraph.statistics.toString)
-
     // Define the three functions needed to implement PageRank in the GraphX
     // version of Pregel
     def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
@@ -121,9 +118,6 @@ object PageRank extends Logging {
       .mapVertices( (id, attr) => (0.0, 0.0) )
       .cache()
 
-    // Display statistics about pagerank
-    logInfo(pagerankGraph.statistics.toString)
-
     // Define the three functions needed to implement PageRank in the GraphX
     // version of Pregel
     def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {