You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2016/02/14 00:56:25 UTC

spark git commit: [SPARK-12363][MLLIB] Remove setRun and fix PowerIterationClustering failed test

Repository: spark
Updated Branches:
  refs/heads/master 374c4b286 -> e3441e3f6


[SPARK-12363][MLLIB] Remove setRun and fix PowerIterationClustering failed test

JIRA: https://issues.apache.org/jira/browse/SPARK-12363

This issue is pointed by yanboliang. When `setRuns` is removed from PowerIterationClustering, one of the tests will be failed. I found that some `dstAttr`s of the normalized graph are not correct values but 0.0. By setting `TripletFields.All` in `mapTriplets` it can work.

Author: Liang-Chi Hsieh <vi...@gmail.com>
Author: Xiangrui Meng <me...@databricks.com>

Closes #10539 from viirya/fix-poweriter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3441e3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3441e3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3441e3f

Branch: refs/heads/master
Commit: e3441e3f68923224d5b576e6112917cf1fe1f89a
Parents: 374c4b2
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Sat Feb 13 15:56:20 2016 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Sat Feb 13 15:56:20 2016 -0800

----------------------------------------------------------------------
 .../mllib/PowerIterationClusteringExample.scala | 53 ++++++-------
 .../clustering/PowerIterationClustering.scala   | 24 +++---
 .../PowerIterationClusteringSuite.scala         | 79 +++++++++++---------
 python/pyspark/mllib/clustering.py              | 25 +++++--
 4 files changed, 96 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e3441e3f/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
index 9208d8e..bb9c1cb 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
@@ -40,27 +40,23 @@ import org.apache.spark.rdd.RDD
  *   n:  Number of sampled points on innermost circle.. There are proportionally more points
  *      within the outer/larger circles
  *   maxIterations:   Number of Power Iterations
- *   outerRadius:  radius of the outermost of the concentric circles
  * }}}
  *
  * Here is a sample run and output:
  *
- * ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15
- *
- * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
- * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
+ * ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15
  *
+ * Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
+ *   0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
  *
  * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
  */
 object PowerIterationClusteringExample {
 
   case class Params(
-      input: String = null,
-      k: Int = 3,
-      numPoints: Int = 5,
-      maxIterations: Int = 10,
-      outerRadius: Double = 3.0
+      k: Int = 2,
+      numPoints: Int = 10,
+      maxIterations: Int = 15
     ) extends AbstractParams[Params]
 
   def main(args: Array[String]) {
@@ -69,7 +65,7 @@ object PowerIterationClusteringExample {
     val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
       head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
       opt[Int]('k', "k")
-        .text(s"number of circles (/clusters), default: ${defaultParams.k}")
+        .text(s"number of circles (clusters), default: ${defaultParams.k}")
         .action((x, c) => c.copy(k = x))
       opt[Int]('n', "n")
         .text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
@@ -77,9 +73,6 @@ object PowerIterationClusteringExample {
       opt[Int]("maxIterations")
         .text(s"number of iterations, default: ${defaultParams.maxIterations}")
         .action((x, c) => c.copy(maxIterations = x))
-      opt[Double]('r', "r")
-        .text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
-        .action((x, c) => c.copy(outerRadius = x))
     }
 
     parser.parse(args, defaultParams).map { params =>
@@ -97,20 +90,21 @@ object PowerIterationClusteringExample {
 
     Logger.getRootLogger.setLevel(Level.WARN)
 
-    val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
+    val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
     val model = new PowerIterationClustering()
       .setK(params.k)
       .setMaxIterations(params.maxIterations)
+      .setInitializationMode("degree")
       .run(circlesRdd)
 
     val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
-    val assignments = clusters.toList.sortBy { case (k, v) => v.length}
+    val assignments = clusters.toList.sortBy { case (k, v) => v.length }
     val assignmentsStr = assignments
       .map { case (k, v) =>
       s"$k -> ${v.sorted.mkString("[", ",", "]")}"
-    }.mkString(",")
+    }.mkString(", ")
     val sizesStr = assignments.map {
-      _._2.size
+      _._2.length
     }.sorted.mkString("(", ",", ")")
     println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
 
@@ -124,20 +118,17 @@ object PowerIterationClusteringExample {
     }
   }
 
-  def generateCirclesRdd(sc: SparkContext,
-      nCircles: Int = 3,
-      nPoints: Int = 30,
-      outerRadius: Double): RDD[(Long, Long, Double)] = {
-
-    val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
-    val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
-    val points = (0 until nCircles).flatMap { cx =>
-      generateCircle(radii(cx), groupSizes(cx))
+  def generateCirclesRdd(
+      sc: SparkContext,
+      nCircles: Int,
+      nPoints: Int): RDD[(Long, Long, Double)] = {
+    val points = (1 to nCircles).flatMap { i =>
+      generateCircle(i, i * nPoints)
     }.zipWithIndex
     val rdd = sc.parallelize(points)
     val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
       if (i0 < i1) {
-        Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
+        Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1))))
       } else {
         None
       }
@@ -148,11 +139,9 @@ object PowerIterationClusteringExample {
   /**
    * Gaussian Similarity:  http://en.wikipedia.org/wiki/Radial_basis_function_kernel
    */
-  def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double): Double = {
-    val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
-    val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
+  def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = {
     val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
-    coeff * math.exp(expCoeff * ssquares)
+    math.exp(-ssquares / 2.0)
   }
 }
 // scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/e3441e3f/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index 1ab7cb3..feacafe 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -25,7 +25,6 @@ import org.apache.spark.{Logging, SparkContext, SparkException}
 import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.impl.GraphImpl
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
 import org.apache.spark.rdd.RDD
@@ -264,10 +263,12 @@ object PowerIterationClustering extends Logging {
       },
       mergeMsg = _ + _,
       TripletFields.EdgeOnly)
-    GraphImpl.fromExistingRDDs(vD, graph.edges)
+    Graph(vD, graph.edges)
       .mapTriplets(
         e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
-        TripletFields.Src)
+        new TripletFields(/* useSrc */ true,
+                          /* useDst */ false,
+                          /* useEdge */ true))
   }
 
   /**
@@ -293,10 +294,12 @@ object PowerIterationClustering extends Logging {
       },
       mergeMsg = _ + _,
       TripletFields.EdgeOnly)
-    GraphImpl.fromExistingRDDs(vD, gA.edges)
+    Graph(vD, gA.edges)
       .mapTriplets(
         e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
-        TripletFields.Src)
+        new TripletFields(/* useSrc */ true,
+                          /* useDst */ false,
+                          /* useEdge */ true))
   }
 
   /**
@@ -317,7 +320,7 @@ object PowerIterationClustering extends Logging {
       }, preservesPartitioning = true).cache()
     val sum = r.values.map(math.abs).sum()
     val v0 = r.mapValues(x => x / sum)
-    GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
+    Graph(VertexRDD(v0), g.edges)
   }
 
   /**
@@ -332,7 +335,7 @@ object PowerIterationClustering extends Logging {
   def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
     val sum = g.vertices.values.sum()
     val v0 = g.vertices.mapValues(_ / sum)
-    GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
+    Graph(VertexRDD(v0), g.edges)
   }
 
   /**
@@ -357,7 +360,9 @@ object PowerIterationClustering extends Logging {
       val v = curG.aggregateMessages[Double](
         sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
         mergeMsg = _ + _,
-        TripletFields.Dst).cache()
+        new TripletFields(/* useSrc */ false,
+                          /* useDst */ true,
+                          /* useEdge */ true)).cache()
       // normalize v
       val norm = v.values.map(math.abs).sum()
       logInfo(s"$msgPrefix: norm(v) = $norm.")
@@ -370,7 +375,7 @@ object PowerIterationClustering extends Logging {
       diffDelta = math.abs(delta - prevDelta)
       logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
       // update v
-      curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
+      curG = Graph(VertexRDD(v1), g.edges)
       prevDelta = delta
     }
     curG.vertices
@@ -387,7 +392,6 @@ object PowerIterationClustering extends Logging {
     val points = v.mapValues(x => Vectors.dense(x)).cache()
     val model = new KMeans()
       .setK(k)
-      .setRuns(5)
       .setSeed(0L)
       .run(points.values)
     points.mapValues(p => model.predict(p)).cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/e3441e3f/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
index 1890005..3d81d37 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
@@ -30,62 +30,65 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
 
   import org.apache.spark.mllib.clustering.PowerIterationClustering._
 
+  /** Generates a circle of points. */
+  private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
+    Array.tabulate(n) { i =>
+      val theta = 2.0 * math.Pi * i / n
+      (r * math.cos(theta), r * math.sin(theta))
+    }
+  }
+
+  /** Computes Gaussian similarity. */
+  private def sim(x: (Double, Double), y: (Double, Double)): Double = {
+    val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
+    math.exp(-dist2 / 2.0)
+  }
+
   test("power iteration clustering") {
-    /*
-     We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
-     edge (3, 4).
-
-     15-14 -13 -12
-     |           |
-     4 . 3 - 2  11
-     |   | x |   |
-     5   0 - 1  10
-     |           |
-     6 - 7 - 8 - 9
-     */
+    // Generate two circles following the example in the PIC paper.
+    val r1 = 1.0
+    val n1 = 10
+    val r2 = 4.0
+    val n2 = 40
+    val n = n1 + n2
+    val points = genCircle(r1, n1) ++ genCircle(r2, n2)
+    val similarities = for (i <- 1 until n; j <- 0 until i) yield {
+      (i.toLong, j.toLong, sim(points(i), points(j)))
+    }
 
-    val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
-      (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
-      (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
-      (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
     val model = new PowerIterationClustering()
       .setK(2)
+      .setMaxIterations(40)
       .run(sc.parallelize(similarities, 2))
     val predictions = Array.fill(2)(mutable.Set.empty[Long])
     model.assignments.collect().foreach { a =>
       predictions(a.cluster) += a.id
     }
-    assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+    assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
 
     val model2 = new PowerIterationClustering()
       .setK(2)
+      .setMaxIterations(10)
       .setInitializationMode("degree")
       .run(sc.parallelize(similarities, 2))
     val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
     model2.assignments.collect().foreach { a =>
       predictions2(a.cluster) += a.id
     }
-    assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+    assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
   }
 
   test("power iteration clustering on graph") {
-    /*
-     We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
-     edge (3, 4).
-
-     15-14 -13 -12
-     |           |
-     4 . 3 - 2  11
-     |   | x |   |
-     5   0 - 1  10
-     |           |
-     6 - 7 - 8 - 9
-     */
-
-    val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
-      (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
-      (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
-      (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
+    // Generate two circles following the example in the PIC paper.
+    val r1 = 1.0
+    val n1 = 10
+    val r2 = 4.0
+    val n2 = 40
+    val n = n1 + n2
+    val points = genCircle(r1, n1) ++ genCircle(r2, n2)
+    val similarities = for (i <- 1 until n; j <- 0 until i) yield {
+      (i.toLong, j.toLong, sim(points(i), points(j)))
+    }
 
     val edges = similarities.flatMap { case (i, j, s) =>
       if (i != j) {
@@ -98,22 +101,24 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
 
     val model = new PowerIterationClustering()
       .setK(2)
+      .setMaxIterations(40)
       .run(graph)
     val predictions = Array.fill(2)(mutable.Set.empty[Long])
     model.assignments.collect().foreach { a =>
       predictions(a.cluster) += a.id
     }
-    assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+    assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
 
     val model2 = new PowerIterationClustering()
       .setK(2)
+      .setMaxIterations(10)
       .setInitializationMode("degree")
       .run(sc.parallelize(similarities, 2))
     val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
     model2.assignments.collect().foreach { a =>
       predictions2(a.cluster) += a.id
     }
-    assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+    assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
   }
 
   test("normalize and powerIter") {

http://git-wip-us.apache.org/repos/asf/spark/blob/e3441e3f/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index ad04e46..5a5bf59 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -571,12 +571,25 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
 
     Model produced by [[PowerIterationClustering]].
 
-    >>> data = [(0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0), (1, 3, 1.0),
-    ... (2, 3, 1.0), (3, 4, 0.1), (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0),
-    ... (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0), (10, 11, 1.0),
-    ... (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0)]
-    >>> rdd = sc.parallelize(data, 2)
-    >>> model = PowerIterationClustering.train(rdd, 2, 100)
+    >>> import math
+    >>> def genCircle(r, n):
+    ...   points = []
+    ...   for i in range(0, n):
+    ...     theta = 2.0 * math.pi * i / n
+    ...     points.append((r * math.cos(theta), r * math.sin(theta)))
+    ...   return points
+    >>> def sim(x, y):
+    ...   dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
+    ...   return math.exp(-dist2 / 2.0)
+    >>> r1 = 1.0
+    >>> n1 = 10
+    >>> r2 = 4.0
+    >>> n2 = 40
+    >>> n = n1 + n2
+    >>> points = genCircle(r1, n1) + genCircle(r2, n2)
+    >>> similarities = [(i, j, sim(points[i], points[j])) for i in range(1, n) for j in range(0, i)]
+    >>> rdd = sc.parallelize(similarities, 2)
+    >>> model = PowerIterationClustering.train(rdd, 2, 40)
     >>> model.k
     2
     >>> result = sorted(model.assignments().collect(), key=lambda x: x.id)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org