You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2016/02/26 06:16:03 UTC
spark git commit: [SPARK-12363] [MLLIB] [BACKPORT-1.3] Remove setRun
and fix PowerIterationClustering failed test
Repository: spark
Updated Branches:
refs/heads/branch-1.3 6ddde8eda -> 65cc451c8
[SPARK-12363] [MLLIB] [BACKPORT-1.3] Remove setRun and fix PowerIterationClustering failed test
## What changes were proposed in this pull request?
Backport JIRA-SPARK-12363 to branch-1.3.
## How was the this patch tested?
Unit test.
cc mengxr
Author: Liang-Chi Hsieh <vi...@gmail.com>
Author: Xiangrui Meng <me...@databricks.com>
Closes #11265 from viirya/backport-12363-1.3 and squashes the following commits:
ec076dd [Liang-Chi Hsieh] Fix scala style.
7a3ef5f [Xiangrui Meng] use Graph instead of GraphImpl and update tests and example based on PIC paper
b86018d [Liang-Chi Hsieh] Remove setRun and fix PowerIterationClustering failed test.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65cc451c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65cc451c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65cc451c
Branch: refs/heads/branch-1.3
Commit: 65cc451c89fd03daed8c315a91d93067ccdc3a5c
Parents: 6ddde8e
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Feb 25 21:15:59 2016 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Feb 25 21:15:59 2016 -0800
----------------------------------------------------------------------
.../mllib/PowerIterationClusteringExample.scala | 53 ++++++++------------
.../clustering/PowerIterationClustering.scala | 18 ++++---
.../PowerIterationClusteringSuite.scala | 48 +++++++++++-------
3 files changed, 60 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/65cc451c/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
index 3f98ddd..c5b171d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
@@ -39,27 +39,23 @@ import org.apache.spark.{SparkConf, SparkContext}
* n: Number of sampled points on innermost circle.. There are proportionally more points
* within the outer/larger circles
* maxIterations: Number of Power Iterations
- * outerRadius: radius of the outermost of the concentric circles
* }}}
*
* Here is a sample run and output:
*
- * ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15
- *
- * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
- * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
+ * ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15
*
+ * Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
+ * 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
*
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object PowerIterationClusteringExample {
case class Params(
- input: String = null,
- k: Int = 3,
- numPoints: Int = 5,
- maxIterations: Int = 10,
- outerRadius: Double = 3.0
+ k: Int = 2,
+ numPoints: Int = 10,
+ maxIterations: Int = 15
) extends AbstractParams[Params]
def main(args: Array[String]) {
@@ -68,7 +64,7 @@ object PowerIterationClusteringExample {
val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
opt[Int]('k', "k")
- .text(s"number of circles (/clusters), default: ${defaultParams.k}")
+ .text(s"number of circles (clusters), default: ${defaultParams.k}")
.action((x, c) => c.copy(k = x))
opt[Int]('n', "n")
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
@@ -76,9 +72,6 @@ object PowerIterationClusteringExample {
opt[Int]("maxIterations")
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
.action((x, c) => c.copy(maxIterations = x))
- opt[Double]('r', "r")
- .text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
- .action((x, c) => c.copy(outerRadius = x))
}
parser.parse(args, defaultParams).map { params =>
@@ -96,20 +89,21 @@ object PowerIterationClusteringExample {
Logger.getRootLogger.setLevel(Level.WARN)
- val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
+ val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
+ .setInitializationMode("degree")
.run(circlesRdd)
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
- val assignments = clusters.toList.sortBy { case (k, v) => v.length}
+ val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
- }.mkString(",")
+ }.mkString(", ")
val sizesStr = assignments.map {
- _._2.size
+ _._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
@@ -123,20 +117,17 @@ object PowerIterationClusteringExample {
}
}
- def generateCirclesRdd(sc: SparkContext,
- nCircles: Int = 3,
- nPoints: Int = 30,
- outerRadius: Double): RDD[(Long, Long, Double)] = {
-
- val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
- val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
- val points = (0 until nCircles).flatMap { cx =>
- generateCircle(radii(cx), groupSizes(cx))
+ def generateCirclesRdd(
+ sc: SparkContext,
+ nCircles: Int,
+ nPoints: Int): RDD[(Long, Long, Double)] = {
+ val points = (1 to nCircles).flatMap { i =>
+ generateCircle(i, i * nPoints)
}.zipWithIndex
val rdd = sc.parallelize(points)
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
if (i0 < i1) {
- Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
+ Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1))))
} else {
None
}
@@ -147,11 +138,9 @@ object PowerIterationClusteringExample {
/**
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
*/
- def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = {
- val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
- val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
+ def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = {
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
- coeff * math.exp(expCoeff * ssquares)
+ math.exp(-ssquares / 2.0)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/65cc451c/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index 1800239..a056eac 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -21,7 +21,6 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.graphx._
-import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
@@ -177,10 +176,12 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
- GraphImpl.fromExistingRDDs(vD, gA.edges)
+ Graph(vD, gA.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
- TripletFields.Src)
+ new TripletFields(/* useSrc */ true,
+ /* useDst */ false,
+ /* useEdge */ true))
}
/**
@@ -201,7 +202,7 @@ object PowerIterationClustering extends Logging {
}, preservesPartitioning = true).cache()
val sum = r.values.map(math.abs).sum()
val v0 = r.mapValues(x => x / sum)
- GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
+ Graph(VertexRDD(v0), g.edges)
}
/**
@@ -216,7 +217,7 @@ object PowerIterationClustering extends Logging {
def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
val sum = g.vertices.values.sum()
val v0 = g.vertices.mapValues(_ / sum)
- GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
+ Graph(VertexRDD(v0), g.edges)
}
/**
@@ -241,7 +242,9 @@ object PowerIterationClustering extends Logging {
val v = curG.aggregateMessages[Double](
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
mergeMsg = _ + _,
- TripletFields.Dst).cache()
+ new TripletFields(/* useSrc */ false,
+ /* useDst */ true,
+ /* useEdge */ true)).cache()
// normalize v
val norm = v.values.map(math.abs).sum()
logInfo(s"$msgPrefix: norm(v) = $norm.")
@@ -254,7 +257,7 @@ object PowerIterationClustering extends Logging {
diffDelta = math.abs(delta - prevDelta)
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
// update v
- curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
+ curG = Graph(VertexRDD(v1), g.edges)
prevDelta = delta
}
curG.vertices
@@ -271,7 +274,6 @@ object PowerIterationClustering extends Logging {
val points = v.mapValues(x => Vectors.dense(x)).cache()
val model = new KMeans()
.setK(k)
- .setRuns(5)
.setSeed(0L)
.run(points.values)
points.mapValues(p => model.predict(p)).cache()
http://git-wip-us.apache.org/repos/asf/spark/blob/65cc451c/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
index ef84252..1cdd24f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
@@ -28,42 +28,52 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
import org.apache.spark.mllib.clustering.PowerIterationClustering._
+ /** Generates a circle of points. */
+ private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
+ Array.tabulate(n) { i =>
+ val theta = 2.0 * math.Pi * i / n
+ (r * math.cos(theta), r * math.sin(theta))
+ }
+ }
+
+ /** Computes Gaussian similarity. */
+ private def sim(x: (Double, Double), y: (Double, Double)): Double = {
+ val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
+ math.exp(-dist2 / 2.0)
+ }
+
test("power iteration clustering") {
- /*
- We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
- edge (3, 4).
-
- 15-14 -13 -12
- | |
- 4 . 3 - 2 11
- | | x | |
- 5 0 - 1 10
- | |
- 6 - 7 - 8 - 9
- */
+ // Generate two circles following the example in the PIC paper.
+ val r1 = 1.0
+ val n1 = 10
+ val r2 = 4.0
+ val n2 = 40
+ val n = n1 + n2
+ val points = genCircle(r1, n1) ++ genCircle(r2, n2)
+ val similarities = for (i <- 1 until n; j <- 0 until i) yield {
+ (i.toLong, j.toLong, sim(points(i), points(j)))
+ }
- val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
- (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
- (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
- (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
val model = new PowerIterationClustering()
.setK(2)
+ .setMaxIterations(40)
.run(sc.parallelize(similarities, 2))
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
- assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
-
+ assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
+
val model2 = new PowerIterationClustering()
.setK(2)
+ .setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
- assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+ assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}
test("normalize and powerIter") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org