You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by WeichenXu123 <gi...@git.apache.org> on 2018/06/04 23:21:37 UTC
[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...
GitHub user WeichenXu123 opened a pull request:
https://github.com/apache/spark/pull/21493
[SPARK-15784] Add Power Iteration Clustering to spark.ml
## What changes were proposed in this pull request?
According to the discussion on JIRA. I rewrite the Power Iteration Clustering API in `spark.ml`.
## How was this patch tested?
Unit test.
Please review http://spark.apache.org/contributing.html before opening a pull request.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/WeichenXu123/spark pic_api
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21493.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21493
----
commit a605a2dba4243e5f1526bc239fd6dbe88dd13ce9
Author: WeichenXu <we...@...>
Date: 2018-06-04T23:12:41Z
init pr
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21493
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21493
**[Test build #91472 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91472/testReport)** for PR 21493 at commit [`15c087f`](https://github.com/apache/spark/commit/15c087fdf3f3f84732e084f7a327813065e678a1).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21493
**[Test build #91470 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91470/testReport)** for PR 21493 at commit [`a605a2d`](https://github.com/apache/spark/commit/a605a2dba4243e5f1526bc239fd6dbe88dd13ce9).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/21493#discussion_r192909750
--- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala ---
@@ -182,66 +137,59 @@ class PowerIterationClustering private[clustering] (
/** @group setParam */
@Since("2.4.0")
- def setIdCol(value: String): this.type = set(idCol, value)
+ def setSrcCol(value: String): this.type = set(srcCol, value)
/** @group setParam */
@Since("2.4.0")
- def setNeighborsCol(value: String): this.type = set(neighborsCol, value)
+ def setDstCol(value: String): this.type = set(dstCol, value)
/** @group setParam */
@Since("2.4.0")
- def setSimilaritiesCol(value: String): this.type = set(similaritiesCol, value)
+ def setWeightCol(value: String): this.type = set(weightCol, value)
+ /**
+ * @param dataset A dataset with columns src, dst, weight representing the affinity matrix,
+ * which is the matrix A in the PIC paper. Suppose the src column value is i,
+ * the dst column value is j, the weight column value is similarity s,,ij,,
+ * must be nonnegative. This is a symmetric matrix and hence s,,ij,, = s,,ji,,.
+ * For any (i, j) with nonzero similarity, there should be either
+ * (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows with i = j are ignored,
+ * because we assume s,,ij,, = 0.0.
+ * @return A dataset that contains columns of vertex id and the corresponding cluster for the id.
+ * The schema of it will be:
+ * - id: Long
+ * - cluster: Int
+ */
@Since("2.4.0")
- override def transform(dataset: Dataset[_]): DataFrame = {
- transformSchema(dataset.schema, logging = true)
+ def assignClusters(dataset: Dataset[_]): DataFrame = {
+ val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) {
+ lit(1.0)
+ } else {
+ col($(weightCol)).cast(DoubleType)
+ }
- val sparkSession = dataset.sparkSession
- val idColValue = $(idCol)
- val rdd: RDD[(Long, Long, Double)] =
- dataset.select(
- col($(idCol)).cast(LongType),
- col($(neighborsCol)).cast(ArrayType(LongType, containsNull = false)),
- col($(similaritiesCol)).cast(ArrayType(DoubleType, containsNull = false))
- ).rdd.flatMap {
- case Row(id: Long, nbrs: Seq[_], sims: Seq[_]) =>
- require(nbrs.size == sims.size, s"The length of the neighbor ID list must be " +
- s"equal to the the length of the neighbor similarity list. Row for ID " +
- s"$idColValue=$id has neighbor ID list of length ${nbrs.length} but similarity list " +
- s"of length ${sims.length}.")
- nbrs.asInstanceOf[Seq[Long]].zip(sims.asInstanceOf[Seq[Double]]).map {
- case (nbr, similarity) => (id, nbr, similarity)
- }
- }
+ SchemaUtils.checkColumnTypes(dataset.schema, $(srcCol), Seq(IntegerType, LongType))
+ SchemaUtils.checkColumnTypes(dataset.schema, $(dstCol), Seq(IntegerType, LongType))
+ val rdd: RDD[(Long, Long, Double)] = dataset.select(
+ col($(srcCol)).cast(LongType),
+ col($(dstCol)).cast(LongType),
+ w).rdd.map {
+ case Row(src: Long, dst: Long, weight: Double) => (src, dst, weight)
+ }
val algorithm = new MLlibPowerIterationClustering()
.setK($(k))
.setInitializationMode($(initMode))
.setMaxIterations($(maxIter))
val model = algorithm.run(rdd)
- val predictionsRDD: RDD[Row] = model.assignments.map { assignment =>
+ val assignmentsRDD: RDD[Row] = model.assignments.map { assignment =>
--- End diff --
`model.assignments.toDF` should work.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the issue:
https://github.com/apache/spark/pull/21493
LGTM. Merged into master. Thanks! @WeichenXu123 Could you also add the Python API?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21493
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91472/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21493
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21493
**[Test build #91472 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91472/testReport)** for PR 21493 at commit [`15c087f`](https://github.com/apache/spark/commit/15c087fdf3f3f84732e084f7a327813065e678a1).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21493
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3808/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/21493#discussion_r192909491
--- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala ---
@@ -182,66 +137,59 @@ class PowerIterationClustering private[clustering] (
/** @group setParam */
@Since("2.4.0")
- def setIdCol(value: String): this.type = set(idCol, value)
+ def setSrcCol(value: String): this.type = set(srcCol, value)
/** @group setParam */
@Since("2.4.0")
- def setNeighborsCol(value: String): this.type = set(neighborsCol, value)
+ def setDstCol(value: String): this.type = set(dstCol, value)
/** @group setParam */
@Since("2.4.0")
- def setSimilaritiesCol(value: String): this.type = set(similaritiesCol, value)
+ def setWeightCol(value: String): this.type = set(weightCol, value)
+ /**
+ * @param dataset A dataset with columns src, dst, weight representing the affinity matrix,
--- End diff --
Should explain what this method does first.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21493
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3806/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21493
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91470/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21493
**[Test build #91470 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91470/testReport)** for PR 21493 at commit [`a605a2d`](https://github.com/apache/spark/commit/a605a2dba4243e5f1526bc239fd6dbe88dd13ce9).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21493
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/21493#discussion_r192910578
--- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala ---
@@ -222,17 +167,13 @@ object PowerIterationClusteringSuite {
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
- val rows = for (i <- 1 until n) yield {
- val neighbors = for (j <- 0 until i) yield {
- j.toLong
+ val rows = (for (i <- 1 until n) yield {
+ for (j <- 0 until i) yield {
+ (i.toLong, j.toLong, sim(points(i), points(j)))
}
- val similarities = for (j <- 0 until i) yield {
- sim(points(i), points(j))
- }
- (i.toLong, neighbors.toArray, similarities.toArray)
- }
+ }).flatMap(_.iterator)
- spark.createDataFrame(rows).toDF("id", "neighbors", "similarities")
+ spark.createDataFrame(rows).toDF("src", "dst", "weight")
}
}
--- End diff --
Should test default weight.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21493
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/21493#discussion_r192909369
--- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala ---
@@ -66,62 +65,35 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has
def getInitMode: String = $(initMode)
/**
- * Param for the name of the input column for vertex IDs.
- * Default: "id"
+ * Param for the name of the input column for source vertex IDs.
+ * Default: "src"
* @group param
*/
@Since("2.4.0")
- val idCol = new Param[String](this, "idCol", "Name of the input column for vertex IDs.",
+ val srcCol = new Param[String](this, "srcCol", "Name of the input column for source vertex IDs.",
(value: String) => value.nonEmpty)
- setDefault(idCol, "id")
+ setDefault(srcCol, "src")
/** @group getParam */
@Since("2.4.0")
- def getIdCol: String = getOrDefault(idCol)
+ def getSrcCol: String = getOrDefault(srcCol)
/**
- * Param for the name of the input column for neighbors in the adjacency list representation.
- * Default: "neighbors"
+ * Name of the input column for destination vertex IDs.
+ * Default: "dst"
* @group param
*/
@Since("2.4.0")
- val neighborsCol = new Param[String](this, "neighborsCol",
- "Name of the input column for neighbors in the adjacency list representation.",
+ val dstCol = new Param[String](this, "dstCol",
+ "Name of the input column for destination vertex IDs.",
(value: String) => value.nonEmpty)
- setDefault(neighborsCol, "neighbors")
+ setDefault(dstCol, "dst")
--- End diff --
Could you put all default values in a single `setDefault` call?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/21493
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/21493#discussion_r192910421
--- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala ---
@@ -62,136 +61,82 @@ class PowerIterationClusteringSuite extends SparkFunSuite
new PowerIterationClustering().setInitMode("no_such_a_mode")
}
intercept[IllegalArgumentException] {
- new PowerIterationClustering().setIdCol("")
+ new PowerIterationClustering().setSrcCol("")
}
intercept[IllegalArgumentException] {
- new PowerIterationClustering().setNeighborsCol("")
- }
- intercept[IllegalArgumentException] {
- new PowerIterationClustering().setSimilaritiesCol("")
+ new PowerIterationClustering().setDstCol("")
}
}
test("power iteration clustering") {
val n = n1 + n2
- val model = new PowerIterationClustering()
+ val result = new PowerIterationClustering()
.setK(2)
.setMaxIter(40)
- val result = model.transform(data)
-
- val predictions = Array.fill(2)(mutable.Set.empty[Long])
- result.select("id", "prediction").collect().foreach {
- case Row(id: Long, cluster: Integer) => predictions(cluster) += id
- }
- assert(predictions.toSet == Set((1 until n1).toSet, (n1 until n).toSet))
+ .setWeightCol("weight")
+ .assignClusters(data).as[(Long, Int)].collect().toSet
--- End diff --
it is better to split a long chain of methods.
~~~scala
val assignments = new ...
...
.assignClusters(...)
val localAssignments = assignments
.select('id, 'cluster) # need this we didn't put contract on column orders
.as[(Long, Int)]
.collect()
.toSet
~~~
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org