You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:59:48 UTC

[24/50] git commit: Miscel doc update.

Miscel doc update.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/02a8f54b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/02a8f54b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/02a8f54b

Branch: refs/heads/master
Commit: 02a8f54bfa4572908d2d605a85e7a5adf9a36fbc
Parents: dc041cd
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Jan 13 17:40:36 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Jan 13 17:40:36 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Edge.scala    |  2 +-
 .../org/apache/spark/graphx/EdgeTriplet.scala   |  4 +-
 .../scala/org/apache/spark/graphx/Graph.scala   | 75 +++++++---------
 .../org/apache/spark/graphx/GraphLoader.scala   | 14 ++-
 .../apache/spark/graphx/PartitionStrategy.scala |  1 +
 .../scala/org/apache/spark/graphx/Pregel.scala  | 14 +--
 .../spark/graphx/impl/EdgePartition.scala       |  7 +-
 .../apache/spark/graphx/impl/GraphImpl.scala    | 15 ++--
 .../org/apache/spark/graphx/lib/Analytics.scala | 12 +--
 .../spark/graphx/lib/ConnectedComponents.scala  |  2 +-
 .../org/apache/spark/graphx/lib/PageRank.scala  |  2 +-
 .../apache/spark/graphx/lib/SVDPlusPlus.scala   | 95 +++++++++++++-------
 .../lib/StronglyConnectedComponents.scala       |  1 +
 .../apache/spark/graphx/lib/TriangleCount.scala | 43 +++++----
 .../scala/org/apache/spark/graphx/package.scala |  5 +-
 .../spark/graphx/util/BytecodeUtils.scala       |  7 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |  2 +-
 17 files changed, 158 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 85f27d2..6c396c3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -13,7 +13,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
     /** The vertex id of the target vertex. */
     var dstId: VertexID = 0,
     /** The attribute associated with the edge. */
-    var attr: ED = nullValue[ED])
+    var attr: ED = null.asInstanceOf[ED])
   extends Serializable {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index 057d63a..4253b24 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -1,7 +1,5 @@
 package org.apache.spark.graphx
 
-import org.apache.spark.graphx.impl.VertexPartition
-
 /**
  * An edge triplet represents an edge along with the vertex attributes of its neighboring vertices.
  *
@@ -47,5 +45,5 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
   def vertexAttr(vid: VertexID): VD =
     if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
 
-  override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+  override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 7d4f0de..d2ba6fd 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -45,7 +45,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
 
   /**
    * An RDD containing the edge triplets, which are edges along with the vertex data associated with
-   * the adjacent vertices.
+   * the adjacent vertices. The caller should use [[edges]] if the vertex data are not needed, i.e.
+   * if only the edge data and adjacent vertex ids are needed.
    *
    * @return an RDD containing edge triplets
    *
@@ -54,13 +55,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    * different color.
    * {{{
    * type Color = Int
-   * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv")
+   * val graph: Graph[Color, Int] = GraphLoader.edgeListFile("hdfs://file.tsv")
    * val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
    * }}}
-   *
-   * @see `edges` if only the edge data and adjacent vertex ids are
-   * required.
-   *
    */
   val triplets: RDD[EdgeTriplet[VD, ED]]
 
@@ -68,9 +65,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    * Caches the vertices and edges associated with this graph at the specified storage level.
    *
    * @param newLevel the level at which to cache the graph.
-
-   * @return A reference to this graph for convenience.
    *
+   * @return A reference to this graph for convenience.
    */
   def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
 
@@ -159,8 +155,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    * @tparam ED2 the new edge data type
    *
    */
-  def mapEdges[ED2: ClassTag](
-      map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
+  def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2])
+    : Graph[VD, ED2]
 
   /**
    * Transforms each edge attribute using the map function, passing it the adjacent vertex attributes
@@ -203,9 +199,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    * @tparam ED2 the new edge data type
    *
    */
-  def mapTriplets[ED2: ClassTag](
-      map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
-    Graph[VD, ED2]
+  def mapTriplets[ED2: ClassTag](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
+    : Graph[VD, ED2]
 
   /**
    * Reverses all edges in the graph.  If this graph contains an edge from a to b then the returned
@@ -233,8 +228,10 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    * @return the subgraph containing only the vertices and edges that
    * satisfy the predicates
    */
-  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
-    vpred: (VertexID, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
+  def subgraph(
+      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+      vpred: (VertexID, VD) => Boolean = ((v, d) => true))
+    : Graph[VD, ED]
 
   /**
    * Restricts the graph to only the vertices and edges that are also in `other`, but keeps the
@@ -249,14 +246,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    * Merges multiple edges between two vertices into a single edge. For correct results, the graph
    * must have been partitioned using [[partitionBy]].
    *
-   * @tparam ED2 the type of the resulting edge data after grouping.
-   *
-   * @param f the user-supplied commutative associative function to merge edge attributes for
-   * duplicate edges.
+   * @param merge the user-supplied commutative associative function to merge edge attributes
+   *              for duplicate edges.
    *
    * @return The resulting graph with a single edge for each (source, dest) vertex pair.
    */
-  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
+  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
 
   /**
    * Computes statistics about the neighboring edges and vertices of each vertex.  The user supplied
@@ -270,7 +265,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    * more messages to neighboring vertices
    *
    * @param reduceFunc the user defined reduce function which should
-   * be commutative and assosciative and is used to combine the output
+   * be commutative and associative and is used to combine the output
    * of the map phase
    *
    * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to consider
@@ -301,21 +296,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
 
   /**
    * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`.  The
-   * input table should contain at most one entry for each vertex.  If no entry in `table` is
+   * input table should contain at most one entry for each vertex.  If no entry in `other` is
    * provided for a particular vertex in the graph, the map function receives `None`.
    *
    * @tparam U the type of entry in the table of updates
    * @tparam VD2 the new vertex value type
    *
-   * @param table the table to join with the vertices in the graph.
-   * The table should contain at most one entry for each vertex.
-   *
-   * @param mapFunc the function used to compute the new vertex
-   * values.  The map function is invoked for all vertices, even those
-   * that do not have a corresponding entry in the table.
+   * @param other the table to join with the vertices in the graph.
+   *              The table should contain at most one entry for each vertex.
+   * @param mapFunc the function used to compute the new vertex values.
+   *                The map function is invoked for all vertices, even those
+   *                that do not have a corresponding entry in the table.
    *
    * @example This function is used to update the vertices with new values based on external data.
-   * For example we could add the out-degree to each vertex record:
+   *          For example we could add the out-degree to each vertex record:
    *
    * {{{
    * val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
@@ -324,20 +318,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    *   (vid, data, optDeg) => optDeg.getOrElse(0)
    * }
    * }}}
-   *
    */
-  def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(VertexID, U)])
+  def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
       (mapFunc: (VertexID, VD, Option[U]) => VD2)
     : Graph[VD2, ED]
 
+  /**
+   * The associated [[GraphOps]] object.
+   */
   // Save a copy of the GraphOps object so there is always one unique GraphOps object
   // for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
   val ops = new GraphOps(this)
 } // end of Graph
 
 
-
-
 /**
  * The Graph object contains a collection of routines used to construct graphs from RDDs.
  */
@@ -357,7 +351,8 @@ object Graph {
   def fromEdgeTuples[VD: ClassTag](
       rawEdges: RDD[(VertexID, VertexID)],
       defaultValue: VD,
-      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
+      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
+  {
     val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
     val graph = GraphImpl(edges, defaultValue)
     uniqueEdges match {
@@ -391,10 +386,8 @@ object Graph {
    * @tparam ED the edge attribute type
    * @param vertices the "set" of vertices and their attributes
    * @param edges the collection of edges in the graph
-   * @param defaultVertexAttr the default vertex attribute to use for
-   * vertices that are mentioned in edges but not in vertices
-   * @param partitionStrategy the partition strategy to use when
-   * partitioning the edges
+   * @param defaultVertexAttr the default vertex attribute to use for vertices that are
+   *                          mentioned in edges but not in vertices
    */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: RDD[(VertexID, VD)],
@@ -406,9 +399,9 @@ object Graph {
   /**
    * Implicitly extracts the [[GraphOps]] member from a graph.
    *
-   * To improve modularity the Graph type only contains a small set of basic operations.  All the
-   * convenience operations are defined in the [[GraphOps]] class which may be shared across multiple
-   * graph implementations.
+   * To improve modularity the Graph type only contains a small set of basic operations.
+   * All the convenience operations are defined in the [[GraphOps]] class which may be
+   * shared across multiple graph implementations.
    */
   implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops
 } // end of Graph object

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 3c06a40..7bdb101 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -1,12 +1,7 @@
 package org.apache.spark.graphx
 
-import java.util.{Arrays => JArrays}
-import scala.reflect.ClassTag
-
-import org.apache.spark.graphx.impl.EdgePartitionBuilder
 import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.graphx.impl.{EdgePartition, GraphImpl}
-import org.apache.spark.util.collection.PrimitiveVector
+import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
 
 /**
  * Provides utilities for loading [[Graph]]s from files.
@@ -31,19 +26,20 @@ object GraphLoader extends Logging {
    * 1    8
    * }}}
    *
-   * @param sc
+   * @param sc SparkContext
    * @param path the path to the file (e.g., /home/data/file or hdfs://file)
    * @param canonicalOrientation whether to orient edges in the positive
    *        direction
    * @param minEdgePartitions the number of partitions for the
    *        the edge RDD
-   * @tparam ED
    */
   def edgeListFile(
       sc: SparkContext,
       path: String,
       canonicalOrientation: Boolean = false,
-      minEdgePartitions: Int = 1): Graph[Int, Int] = {
+      minEdgePartitions: Int = 1)
+    : Graph[Int, Int] =
+  {
     val startTime = System.currentTimeMillis
 
     // Parse the edge data table directly into edge partitions

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index fc7635a..b9ccd87 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -5,6 +5,7 @@ package org.apache.spark.graphx
  * vertex IDs.
  */
 sealed trait PartitionStrategy extends Serializable {
+  /** Returns the partition number for a given edge. */
   def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 83e28d0..ce4eb53 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -89,14 +89,16 @@ object Pregel {
    *
    */
   def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
-    (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue,
-      activeDirection: EdgeDirection = EdgeDirection.Out)(
-      vprog: (VertexID, VD, A) => VD,
+     (graph: Graph[VD, ED],
+      initialMsg: A,
+      maxIterations: Int = Int.MaxValue,
+      activeDirection: EdgeDirection = EdgeDirection.Out)
+     (vprog: (VertexID, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
       mergeMsg: (A, A) => A)
-    : Graph[VD, ED] = {
-
-    var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
+    : Graph[VD, ED] =
+  {
+    var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
     // compute the messages
     var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
     var activeMessages = messages.count()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index a03e73e..d4d7162 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -100,10 +100,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    */
   def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
     val builder = new EdgePartitionBuilder[ED]
-    var firstIter: Boolean = true
-    var currSrcId: VertexID = nullValue[VertexID]
-    var currDstId: VertexID = nullValue[VertexID]
-    var currAttr: ED = nullValue[ED]
+    var currSrcId: VertexID = null.asInstanceOf[VertexID]
+    var currDstId: VertexID = null.asInstanceOf[VertexID]
+    var currAttr: ED = null.asInstanceOf[ED]
     var i = 0
     while (i < size) {
       if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 6a2abc7..9e39519 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -249,8 +249,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
     // For each vertex, replicate its attribute only to partitions where it is
     // in the relevant position in an edge.
-    val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
-    val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
+    val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
+    val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
     val vs = activeSetOpt match {
       case Some((activeSet, _)) =>
         replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
@@ -308,10 +308,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   } // end of mapReduceTriplets
 
   override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
-      (updates: RDD[(VertexID, U)])(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = {
+      (other: RDD[(VertexID, U)])
+      (updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] =
+  {
     if (classTag[VD] equals classTag[VD2]) {
       // updateF preserves type, so we can use incremental replication
-      val newVerts = vertices.leftJoin(updates)(updateF)
+      val newVerts = vertices.leftJoin(other)(updateF)
       val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
       val newReplicatedVertexView = new ReplicatedVertexView[VD2](
         changedVerts, edges, routingTable,
@@ -319,12 +321,13 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
       new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
     } else {
       // updateF does not preserve type, so we must re-replicate all vertices
-      val newVerts = vertices.leftJoin(updates)(updateF)
+      val newVerts = vertices.leftJoin(other)(updateF)
       GraphImpl(newVerts, edges, routingTable)
     }
   }
 
-  private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
+  /** Test whether the closure accesses the the attribute with name `attrName`. */
+  private def accessesVertexAttr(closure: AnyRef, attrName: String): Boolean = {
     try {
       BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index 8c35f42..d5e1de1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -37,14 +37,12 @@ object Analytics extends Logging {
       case "pagerank" =>
         var tol: Float = 0.001F
         var outFname = ""
-        var numVPart = 4
         var numEPart = 4
         var partitionStrategy: Option[PartitionStrategy] = None
 
         options.foreach{
           case ("tol", v) => tol = v.toFloat
           case ("output", v) => outFname = v
-          case ("numVPart", v) => numVPart = v.toInt
           case ("numEPart", v) => numEPart = v.toInt
           case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
@@ -90,16 +88,12 @@ object Analytics extends Logging {
           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
         }
 
-        if(!isDynamic && numIter == Int.MaxValue) {
+        if (!isDynamic && numIter == Int.MaxValue) {
           println("Set number of iterations!")
           sys.exit(1)
         }
         println("======================================")
         println("|      Connected Components          |")
-        println("--------------------------------------")
-        println(" Using parameters:")
-        println(" \tDynamic:  " + isDynamic)
-        println(" \tNumIter:  " + numIter)
         println("======================================")
 
         val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")", conf)
@@ -112,20 +106,18 @@ object Analytics extends Logging {
         sc.stop()
 
       case "triangles" =>
-        var numVPart = 4
         var numEPart = 4
         // TriangleCount requires the graph to be partitioned
         var partitionStrategy: PartitionStrategy = RandomVertexCut
 
         options.foreach{
           case ("numEPart", v) => numEPart = v.toInt
-          case ("numVPart", v) => numVPart = v.toInt
           case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
         }
         println("======================================")
         println("|      Triangle Count                |")
-        println("--------------------------------------")
+        println("======================================")
         val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf)
         val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
           minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index d078d2a..da03d99 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -4,7 +4,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.graphx._
 
-
+/** Connected components algorithm. */
 object ConnectedComponents {
   /**
    * Compute the connected component membership of each vertex and return a graph with the vertex

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index cf95267..853ef38 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -5,7 +5,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.Logging
 import org.apache.spark.graphx._
 
-
+/** PageRank algorithm implementation. */
 object PageRank extends Logging {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index f5570da..fa6b1db 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -1,11 +1,12 @@
 package org.apache.spark.graphx.lib
 
-import org.apache.spark.rdd._
-import org.apache.spark.graphx._
 import scala.util.Random
 import org.apache.commons.math.linear._
+import org.apache.spark.rdd._
+import org.apache.spark.graphx._
 
-class SVDPlusPlusConf( // SVDPlusPlus parameters
+/** Configuration parameters for SVDPlusPlus. */
+class SVDPlusPlusConf(
   var rank: Int,
   var maxIters: Int,
   var minVal: Double,
@@ -15,11 +16,15 @@ class SVDPlusPlusConf( // SVDPlusPlus parameters
   var gamma6: Double,
   var gamma7: Double) extends Serializable
 
+/** Implementation of SVD++ algorithm. */
 object SVDPlusPlus {
   /**
-   * Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model",
-   * paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
-   * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), see the details on page 6.
+   * Implement SVD++ based on "Factorization Meets the Neighborhood:
+   * a Multifaceted Collaborative Filtering Model",
+   * available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
+   *
+   * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)),
+   * see the details on page 6.
    *
    * @param edges edges for constructing the graph
    *
@@ -27,16 +32,16 @@ object SVDPlusPlus {
    *
    * @return a graph with vertex attributes containing the trained model
    */
-
-  def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = {
-
-    // generate default vertex attribute
+  def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf)
+    : (Graph[(RealVector, RealVector, Double, Double), Double], Double) =
+  {
+    // Generate default vertex attribute
     def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = {
       val v1 = new ArrayRealVector(rank)
       val v2 = new ArrayRealVector(rank)
       for (i <- 0 until rank) {
-        v1.setEntry(i, Random.nextDouble)
-        v2.setEntry(i, Random.nextDouble)
+        v1.setEntry(i, Random.nextDouble())
+        v2.setEntry(i, Random.nextDouble())
       }
       (v1, v2, 0.0, 0.0)
     }
@@ -49,14 +54,18 @@ object SVDPlusPlus {
     // construct graph
     var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache()
 
-    // calculate initial bias and norm
-    var t0 = g.mapReduceTriplets(et =>
-      Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
-    g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
-      (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
+    // Calculate initial bias and norm
+    val t0 = g.mapReduceTriplets(
+      et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))),
+      (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
+
+    g = g.outerJoinVertices(t0) {
+      (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+        (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
     }
 
-    def mapTrainF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
+    def mapTrainF(conf: SVDPlusPlusConf, u: Double)
+        (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
       : Iterator[(VertexID, (RealVector, RealVector, Double))] = {
       val (usr, itm) = (et.srcAttr, et.dstAttr)
       val (p, q) = (usr._1, itm._1)
@@ -64,31 +73,49 @@ object SVDPlusPlus {
       pred = math.max(pred, conf.minVal)
       pred = math.min(pred, conf.maxVal)
       val err = et.attr - pred
-      val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
-      val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
-      val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
+      val updateP = q.mapMultiply(err)
+        .subtract(p.mapMultiply(conf.gamma7))
+        .mapMultiply(conf.gamma2)
+      val updateQ = usr._2.mapMultiply(err)
+        .subtract(q.mapMultiply(conf.gamma7))
+        .mapMultiply(conf.gamma2)
+      val updateY = q.mapMultiply(err * usr._4)
+        .subtract(itm._2.mapMultiply(conf.gamma7))
+        .mapMultiply(conf.gamma2)
       Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)),
         (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1)))
     }
 
     for (i <- 0 until conf.maxIters) {
-      // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
+      // Phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
       g.cache()
-      var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2))
-      g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
-        if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
+      val t1 = g.mapReduceTriplets(
+        et => Iterator((et.srcId, et.dstAttr._2)),
+        (g1: RealVector, g2: RealVector) => g1.add(g2))
+      g = g.outerJoinVertices(t1) {
+        (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
+          if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
       }
-      // phase 2, update p for user nodes and q, y for item nodes
+
+      // Phase 2, update p for user nodes and q, y for item nodes
       g.cache()
-      val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
-        (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
-      g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) =>
-        (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
+      val t2 = g.mapReduceTriplets(
+        mapTrainF(conf, u),
+        (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
+          (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
+      g = g.outerJoinVertices(t2) {
+        (vid: VertexID,
+         vd: (RealVector, RealVector, Double, Double),
+         msg: Option[(RealVector, RealVector, Double)]) =>
+          (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
       }
     }
 
     // calculate error on training set
-    def mapTestF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = {
+    def mapTestF(conf: SVDPlusPlusConf, u: Double)
+        (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
+      : Iterator[(VertexID, Double)] =
+    {
       val (usr, itm) = (et.srcAttr, et.dstAttr)
       val (p, q) = (usr._1, itm._1)
       var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@@ -99,9 +126,11 @@ object SVDPlusPlus {
     }
     g.cache()
     val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
-    g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
-      if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
+    g = g.outerJoinVertices(t3) {
+      (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+        if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
     }
+
     (g, u)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
index 43c4b9c..1184750 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
@@ -4,6 +4,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.graphx._
 
+/** Strongly connected components algorithm implementation. */
 object StronglyConnectedComponents {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index 58da9e3..f87eab9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -4,27 +4,26 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.graphx._
 
-
+/**
+ * Compute the number of triangles passing through each vertex.
+ *
+ * The algorithm is relatively straightforward and can be computed in three steps:
+ *
+ * 1) Compute the set of neighbors for each vertex
+ * 2) For each edge compute the intersection of the sets and send the
+ *    count to both vertices.
+ * 3) Compute the sum at each vertex and divide by two since each
+ *    triangle is counted twice.
+ *
+ * Note that the input graph should have its edges in canonical direction
+ * (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
+ * using [[org.apache.spark.graphx.Graph#partitionBy]].
+ */
 object TriangleCount {
-  /**
-   * Compute the number of triangles passing through each vertex.
-   *
-   * The algorithm is relatively straightforward and can be computed in three steps:
-   *
-   * 1) Compute the set of neighbors for each vertex
-   * 2) For each edge compute the intersection of the sets and send the
-   *    count to both vertices.
-   * 3) Compute the sum at each vertex and divide by two since each
-   *    triangle is counted twice.
-   *
-   *
-   * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned
-   * using [[org.apache.spark.graphx.Graph#partitionBy]], and its edges must be in canonical
-   * orientation (srcId < dstId).
-   */
+
   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
     // Remove redundant edges
-    val g = graph.groupEdges((a, b) => a).cache
+    val g = graph.groupEdges((a, b) => a).cache()
 
     // Construct set representations of the neighborhoods
     val nbrSets: VertexRDD[VertexSet] =
@@ -56,8 +55,10 @@ object TriangleCount {
       val iter = smallSet.iterator
       var counter: Int = 0
       while (iter.hasNext) {
-        val vid = iter.next
-        if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 }
+        val vid = iter.next()
+        if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) {
+          counter += 1
+        }
       }
       Iterator((et.srcId, counter), (et.dstId, counter))
     }
@@ -71,7 +72,5 @@ object TriangleCount {
         assert((dblCount & 1) == 0)
         dblCount / 2
     }
-
   } // end of TriangleCount
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
index e70d2fd..60dfc1d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
@@ -2,6 +2,7 @@ package org.apache.spark
 
 import org.apache.spark.util.collection.OpenHashSet
 
+/** GraphX is a graph processing framework built on top of Spark. */
 package object graphx {
   /**
    * A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
@@ -9,11 +10,9 @@ package object graphx {
    */
   type VertexID = Long
 
+  /** Integer identifer of a graph partition. */
   // TODO: Consider using Char.
   type PartitionID = Int
 
   private[graphx] type VertexSet = OpenHashSet[VertexID]
-
-  /** Returns the default null-like value for a data type T. */
-  private[graphx] def nullValue[T] = null.asInstanceOf[T]
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index ec8d534..1c5b234 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -10,8 +10,11 @@ import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
 import org.objectweb.asm.Opcodes._
 
 
-
-private[spark] object BytecodeUtils {
+/**
+ * Includes an utility function to test whether a function accesses a specific attribute
+ * of an object.
+ */
+private[graphx] object BytecodeUtils {
 
   /**
    * Test whether the given closure invokes the specified method in the specified class.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02a8f54b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
index 1088944..7b02e2e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -28,7 +28,7 @@ import scala.reflect._
  *
  * Under the hood, it uses our OpenHashSet implementation.
  */
-private[spark]
+private[graphx]
 class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
                               @specialized(Long, Int, Double) V: ClassTag](
     val keySet: OpenHashSet[K], var _values: Array[V])