You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/08 20:31:50 UTC

spark git commit: [SPARK-6765] Fix test code style for graphx.

Repository: spark
Updated Branches:
  refs/heads/master 9d44ddce1 -> 8d812f998


[SPARK-6765] Fix test code style for graphx.

So we can turn style checker on for test code.

Author: Reynold Xin <rx...@databricks.com>

Closes #5410 from rxin/test-style-graphx and squashes the following commits:

89e253a [Reynold Xin] [SPARK-6765] Fix test code style for graphx.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d812f99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d812f99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d812f99

Branch: refs/heads/master
Commit: 8d812f9986f2edf420a18ca822711c9765f480e2
Parents: 9d44ddc
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Apr 8 11:31:48 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 8 11:31:48 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/graphx/GraphSuite.scala    | 71 +++++++++++---------
 .../apache/spark/graphx/LocalSparkContext.scala |  2 +-
 .../apache/spark/graphx/VertexRDDSuite.scala    | 26 +++----
 .../graphx/lib/ConnectedComponentsSuite.scala   | 18 ++---
 .../apache/spark/graphx/lib/PageRankSuite.scala | 33 ++++-----
 .../lib/StronglyConnectedComponentsSuite.scala  | 23 ++++---
 6 files changed, 88 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d812f99/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 8d15150..a570e4e 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -38,12 +38,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val doubleRing = ring ++ ring
       val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1)
       assert(graph.edges.count() === doubleRing.size)
-      assert(graph.edges.collect.forall(e => e.attr == 1))
+      assert(graph.edges.collect().forall(e => e.attr == 1))
 
       // uniqueEdges option should uniquify edges and store duplicate count in edge attributes
       val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut))
       assert(uniqueGraph.edges.count() === ring.size)
-      assert(uniqueGraph.edges.collect.forall(e => e.attr == 2))
+      assert(uniqueGraph.edges.collect().forall(e => e.attr == 2))
     }
   }
 
@@ -64,7 +64,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       assert( graph.edges.count() === rawEdges.size )
       // Vertices not explicitly provided but referenced by edges should be created automatically
       assert( graph.vertices.count() === 100)
-      graph.triplets.collect.map { et =>
+      graph.triplets.collect().map { et =>
         assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
         assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
       }
@@ -75,15 +75,17 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     withSpark { sc =>
       val n = 5
       val star = starGraph(sc, n)
-      assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
-        (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
+      assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect().toSet
+        === (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
     }
   }
 
   test("partitionBy") {
     withSpark { sc =>
-      def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
-      def nonemptyParts(graph: Graph[Int, Int]) = {
+      def mkGraph(edges: List[(Long, Long)]): Graph[Int, Int] = {
+        Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
+      }
+      def nonemptyParts(graph: Graph[Int, Int]): RDD[List[Edge[Int]]] = {
         graph.edges.partitionsRDD.mapPartitions { iter =>
           Iterator(iter.next()._2.iterator.toList)
         }.filter(_.nonEmpty)
@@ -102,7 +104,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1)
       // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
       // the same partition
-      assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
+      assert(
+        nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
       // partitionBy(EdgePartition2D) puts identical edges in the same partition
       assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)
 
@@ -140,10 +143,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val g = Graph(
         sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
         sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
-      assert(g.triplets.collect.map(_.toTuple).toSet ===
+      assert(g.triplets.collect().map(_.toTuple).toSet ===
         Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
       val gPart = g.partitionBy(EdgePartition2D)
-      assert(gPart.triplets.collect.map(_.toTuple).toSet ===
+      assert(gPart.triplets.collect().map(_.toTuple).toSet ===
         Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
     }
   }
@@ -154,10 +157,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val star = starGraph(sc, n)
       // mapVertices preserving type
       val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
-      assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
+      assert(mappedVAttrs.vertices.collect().toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
       // mapVertices changing type
       val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
-      assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
+      assert(mappedVAttrs2.vertices.collect().toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
     }
   }
 
@@ -177,12 +180,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       // Trigger initial vertex replication
       graph0.triplets.foreach(x => {})
       // Change type of replicated vertices, but preserve erased type
-      val graph1 = graph0.mapVertices {
-        case (vid, integerOpt) => integerOpt.map((x: java.lang.Integer) => (x.toDouble): java.lang.Double)
+      val graph1 = graph0.mapVertices { case (vid, integerOpt) =>
+        integerOpt.map((x: java.lang.Integer) => x.toDouble: java.lang.Double)
       }
       // Access replicated vertices, exposing the erased type
       val graph2 = graph1.mapTriplets(t => t.srcAttr.get)
-      assert(graph2.edges.map(_.attr).collect.toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
+      assert(graph2.edges.map(_.attr).collect().toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
     }
   }
 
@@ -202,7 +205,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     withSpark { sc =>
       val n = 5
       val star = starGraph(sc, n)
-      assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
+      assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect().toSet ===
         (1L to n).map(x => Edge(0, x, "vv")).toSet)
     }
   }
@@ -211,7 +214,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     withSpark { sc =>
       val n = 5
       val star = starGraph(sc, n)
-      assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
+      assert(star.reverse.outDegrees.collect().toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
     }
   }
 
@@ -221,7 +224,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
       val graph = Graph(vertices, edges).reverse
       val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
-      assert(result.collect.toSet === Set((1L, 2)))
+      assert(result.collect().toSet === Set((1L, 2)))
     }
   }
 
@@ -237,7 +240,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)
 
       // And 4 edges.
-      assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
+      assert(subgraph.edges.map(_.copy()).collect().toSet ===
+        (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
     }
   }
 
@@ -273,9 +277,9 @@ class GraphSuite extends FunSuite with LocalSparkContext {
         sc.parallelize((1 to n).flatMap(x =>
           List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v")
       val star2 = doubleStar.groupEdges { (a, b) => a}
-      assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
-        star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
-      assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
+      assert(star2.edges.collect().toArray.sorted(Edge.lexicographicOrdering[Int]) ===
+        star.edges.collect().toArray.sorted(Edge.lexicographicOrdering[Int]))
+      assert(star2.vertices.collect().toSet === star.vertices.collect().toSet)
     }
   }
 
@@ -300,21 +304,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
           throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
         }
         Iterator((et.srcId, 1))
-      }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
+      }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
       assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
 
       // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
-      val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3)
+      val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
       val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
       val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
-      val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+      val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
+        newOpt.getOrElse(old)
+      }
       val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
         // Map function should only run on edges with source in the active set
         if (et.srcId % 2 != 1) {
           throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
         }
         Iterator((et.dstId, 1))
-      }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
+      }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
       assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
 
     }
@@ -340,17 +346,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val n = 5
       val reverseStar = starGraph(sc, n).reverse.cache()
       // outerJoinVertices changing type
-      val reverseStarDegrees =
-        reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
+      val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
+        (vid, a, bOpt) => bOpt.getOrElse(0)
+      }
       val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
         et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
-        (a: Int, b: Int) => a + b).collect.toSet
+        (a: Int, b: Int) => a + b).collect().toSet
       assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
       // outerJoinVertices preserving type
       val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
       val newReverseStar =
         reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
-      assert(newReverseStar.vertices.map(_._2).collect.toSet ===
+      assert(newReverseStar.vertices.map(_._2).collect().toSet ===
         (0 to n).map(x => "v%d".format(x)).toSet)
     }
   }
@@ -361,7 +368,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
       val graph = Graph(verts, edges)
       val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
-        .collect.toSet
+        .collect().toSet
       assert(triplets ===
         Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
     }
@@ -417,7 +424,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val graph = Graph.fromEdgeTuples(edges, 1)
       val neighborAttrSums = graph.mapReduceTriplets[Int](
         et => Iterator((et.dstId, et.srcAttr)), _ + _)
-      assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
+      assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
     } finally {
       sc.stop()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8d812f99/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
index a3e28ef..d2ad9be 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext
 */
 trait LocalSparkContext {
   /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
-  def withSpark[T](f: SparkContext => T) = {
+  def withSpark[T](f: SparkContext => T): T = {
     val conf = new SparkConf()
     GraphXUtils.registerKryoClasses(conf)
     val sc = new SparkContext("local", "test", conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/8d812f99/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index c9443d1..d0a7198 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.storage.StorageLevel
 
 class VertexRDDSuite extends FunSuite with LocalSparkContext {
 
-  def vertices(sc: SparkContext, n: Int) = {
+  private def vertices(sc: SparkContext, n: Int) = {
     VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5))
   }
 
@@ -52,7 +52,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val vertexA = VertexRDD(sc.parallelize(0 until 75, 2).map(i => (i.toLong, 0))).cache()
       val vertexB = VertexRDD(sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1))).cache()
       val vertexC = vertexA.minus(vertexB)
-      assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
+      assert(vertexC.map(_._1).collect().toSet === (0 until 25).toSet)
     }
   }
 
@@ -62,7 +62,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val vertexB: RDD[(VertexId, Int)] =
         sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1)).cache()
       val vertexC = vertexA.minus(vertexB)
-      assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
+      assert(vertexC.map(_._1).collect().toSet === (0 until 25).toSet)
     }
   }
 
@@ -72,7 +72,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val vertexB = VertexRDD(sc.parallelize(50 until 100, 2).map(i => (i.toLong, 1)))
       assert(vertexA.partitions.size != vertexB.partitions.size)
       val vertexC = vertexA.minus(vertexB)
-      assert(vertexC.map(_._1).collect.toSet === (0 until 50).toSet)
+      assert(vertexC.map(_._1).collect().toSet === (0 until 50).toSet)
     }
   }
 
@@ -106,7 +106,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1)))
       assert(vertexA.partitions.size != vertexB.partitions.size)
       val vertexC = vertexA.diff(vertexB)
-      assert(vertexC.map(_._1).collect.toSet === (8 until 16).toSet)
+      assert(vertexC.map(_._1).collect().toSet === (8 until 16).toSet)
     }
   }
 
@@ -116,11 +116,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val verts = vertices(sc, n).cache()
       val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
       // leftJoin with another VertexRDD
-      assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
+      assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect().toSet ===
         (0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
       // leftJoin with an RDD
       val evensRDD = evens.map(identity)
-      assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
+      assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect().toSet ===
         (0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
     }
   }
@@ -134,7 +134,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val vertexC = vertexA.leftJoin(vertexB) { (vid, old, newOpt) =>
         old - newOpt.getOrElse(0)
       }
-      assert(vertexC.filter(v => v._2 != 0).map(_._1).collect.toSet == (1 to 99 by 2).toSet)
+      assert(vertexC.filter(v => v._2 != 0).map(_._1).collect().toSet == (1 to 99 by 2).toSet)
     }
   }
 
@@ -144,11 +144,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val verts = vertices(sc, n).cache()
       val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
       // innerJoin with another VertexRDD
-      assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect.toSet ===
+      assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect().toSet ===
         (0 to n by 2).map(x => (x.toLong, 0)).toSet)
       // innerJoin with an RDD
       val evensRDD = evens.map(identity)
-      assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect.toSet ===
+      assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect().toSet ===
         (0 to n by 2).map(x => (x.toLong, 0)).toSet)    }
   }
 
@@ -161,7 +161,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val vertexC = vertexA.innerJoin(vertexB) { (vid, old, newVal) =>
         old - newVal
       }
-      assert(vertexC.filter(v => v._2 == 0).map(_._1).collect.toSet == (0 to 98 by 2).toSet)
+      assert(vertexC.filter(v => v._2 == 0).map(_._1).collect().toSet == (0 to 98 by 2).toSet)
     }
   }
 
@@ -171,7 +171,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val verts = vertices(sc, n)
       val messageTargets = (0 to n) ++ (0 to n by 2)
       val messages = sc.parallelize(messageTargets.map(x => (x.toLong, 1)))
-      assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect.toSet ===
+      assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect().toSet ===
         (0 to n).map(x => (x.toLong, if (x % 2 == 0) 2 else 1)).toSet)
     }
   }
@@ -183,7 +183,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
       val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
       val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
       // test merge function
-      assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
+      assert(rdd.collect().toSet == Set((0L, 0), (1L, 3), (2L, 9)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8d812f99/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
index 3915be1..4cc30a9 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
@@ -32,7 +32,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
     withSpark { sc =>
       val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
       val ccGraph = gridGraph.connectedComponents()
-      val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+      val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum()
       assert(maxCCid === 0)
     }
   } // end of Grid connected components
@@ -42,7 +42,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
     withSpark { sc =>
       val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse
       val ccGraph = gridGraph.connectedComponents()
-      val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+      val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum()
       assert(maxCCid === 0)
     }
   } // end of Grid connected components
@@ -50,8 +50,8 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
 
   test("Chain Connected Components") {
     withSpark { sc =>
-      val chain1 = (0 until 9).map(x => (x, x+1) )
-      val chain2 = (10 until 20).map(x => (x, x+1) )
+      val chain1 = (0 until 9).map(x => (x, x + 1))
+      val chain2 = (10 until 20).map(x => (x, x + 1))
       val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
       val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0)
       val ccGraph = twoChains.connectedComponents()
@@ -73,12 +73,12 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
 
   test("Reverse Chain Connected Components") {
     withSpark { sc =>
-      val chain1 = (0 until 9).map(x => (x, x+1) )
-      val chain2 = (10 until 20).map(x => (x, x+1) )
+      val chain1 = (0 until 9).map(x => (x, x + 1))
+      val chain2 = (10 until 20).map(x => (x, x + 1))
       val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
       val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse
       val ccGraph = twoChains.connectedComponents()
-      val vertices = ccGraph.vertices.collect
+      val vertices = ccGraph.vertices.collect()
       for ( (id, cc) <- vertices ) {
         if (id < 10) {
           assert(cc === 0)
@@ -120,9 +120,9 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
       // Build the initial Graph
       val graph = Graph(users, relationships, defaultUser)
       val ccGraph = graph.connectedComponents()
-      val vertices = ccGraph.vertices.collect
+      val vertices = ccGraph.vertices.collect()
       for ( (id, cc) <- vertices ) {
-        assert(cc == 0)
+        assert(cc === 0)
       }
     }
   } // end of toy connected components

http://git-wip-us.apache.org/repos/asf/spark/blob/8d812f99/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index fc491ae..95804b0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -19,15 +19,12 @@ package org.apache.spark.graphx.lib
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.lib._
 import org.apache.spark.graphx.util.GraphGenerators
-import org.apache.spark.rdd._
+
 
 object GridPageRank {
-  def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
+  def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): Seq[(VertexId, Double)] = {
     val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
     val outDegree = Array.fill(nRows * nCols)(0)
     // Convert row column address into vertex ids (row major order)
@@ -35,13 +32,13 @@ object GridPageRank {
     // Make the grid graph
     for (r <- 0 until nRows; c <- 0 until nCols) {
       val ind = sub2ind(r,c)
-      if (r+1 < nRows) {
+      if (r + 1 < nRows) {
         outDegree(ind) += 1
-        inNbrs(sub2ind(r+1,c)) += ind
+        inNbrs(sub2ind(r + 1,c)) += ind
       }
-      if (c+1 < nCols) {
+      if (c + 1 < nCols) {
         outDegree(ind) += 1
-        inNbrs(sub2ind(r,c+1)) += ind
+        inNbrs(sub2ind(r,c + 1)) += ind
       }
     }
     // compute the pagerank
@@ -64,7 +61,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
 
   def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
     a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
-      .map { case (id, error) => error }.sum
+      .map { case (id, error) => error }.sum()
   }
 
   test("Star PageRank") {
@@ -80,12 +77,12 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
       // Static PageRank should only take 2 iterations to converge
       val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
         if (pr1 != pr2) 1 else 0
-      }.map { case (vid, test) => test }.sum
+      }.map { case (vid, test) => test }.sum()
       assert(notMatching === 0)
 
       val staticErrors = staticRanks2.map { case (vid, pr) =>
-        val correct = (vid > 0 && pr == resetProb) ||
-          (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
+        val p = math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) ))
+        val correct = (vid > 0 && pr == resetProb) || (vid == 0L && p < 1.0E-5)
         if (!correct) 1 else 0
       }
       assert(staticErrors.sum === 0)
@@ -95,8 +92,6 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
     }
   } // end of test Star PageRank
 
-
-
   test("Grid PageRank") {
     withSpark { sc =>
       val rows = 10
@@ -109,18 +104,18 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
 
       val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache()
       val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache()
-      val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache()
+      val referenceRanks = VertexRDD(
+        sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache()
 
       assert(compareRanks(staticRanks, referenceRanks) < errorTol)
       assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
     }
   } // end of Grid PageRank
 
-
   test("Chain PageRank") {
     withSpark { sc =>
-      val chain1 = (0 until 9).map(x => (x, x+1) )
-      val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
+      val chain1 = (0 until 9).map(x => (x, x + 1))
+      val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong) }
       val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
       val resetProb = 0.15
       val tol = 0.0001

http://git-wip-us.apache.org/repos/asf/spark/blob/8d812f99/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala
index df54aa3..1f658c3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala
@@ -34,8 +34,8 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
       val edges = sc.parallelize(Seq.empty[Edge[Int]])
       val graph = Graph(vertices, edges)
       val sccGraph = graph.stronglyConnectedComponents(5)
-      for ((id, scc) <- sccGraph.vertices.collect) {
-        assert(id == scc)
+      for ((id, scc) <- sccGraph.vertices.collect()) {
+        assert(id === scc)
       }
     }
   }
@@ -45,8 +45,8 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
       val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7)))
       val graph = Graph.fromEdgeTuples(rawEdges, -1)
       val sccGraph = graph.stronglyConnectedComponents(20)
-      for ((id, scc) <- sccGraph.vertices.collect) {
-        assert(0L == scc)
+      for ((id, scc) <- sccGraph.vertices.collect()) {
+        assert(0L === scc)
       }
     }
   }
@@ -60,13 +60,14 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
       val rawEdges = sc.parallelize(edges)
       val graph = Graph.fromEdgeTuples(rawEdges, -1)
       val sccGraph = graph.stronglyConnectedComponents(20)
-      for ((id, scc) <- sccGraph.vertices.collect) {
-        if (id < 3)
-          assert(0L == scc)
-        else if (id < 6)
-          assert(3L == scc)
-        else
-          assert(id == scc)
+      for ((id, scc) <- sccGraph.vertices.collect()) {
+        if (id < 3) {
+          assert(0L === scc)
+        } else if (id < 6) {
+          assert(3L === scc)
+        } else {
+          assert(id === scc)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org