You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2018/05/17 23:33:11 UTC
spark git commit: [SPARK-22884][ML] ML tests for StructuredStreaming:
spark.ml.clustering
Repository: spark
Updated Branches:
refs/heads/master 439c69511 -> d4a0895c6
[SPARK-22884][ML] ML tests for StructuredStreaming: spark.ml.clustering
## What changes were proposed in this pull request?
Converting clustering tests to also check code with structured streaming, using the ML testing infrastructure implemented in SPARK-22882.
This PR is a new version of https://github.com/apache/spark/pull/20319
Author: Sandor Murakozi <sm...@gmail.com>
Author: Joseph K. Bradley <jo...@databricks.com>
Closes #21358 from jkbradley/smurakozi-SPARK-22884.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4a0895c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4a0895c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4a0895c
Branch: refs/heads/master
Commit: d4a0895c628ca854895c3c35c46ed990af36ec61
Parents: 439c695
Author: Sandor Murakozi <sm...@gmail.com>
Authored: Thu May 17 16:33:06 2018 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu May 17 16:33:06 2018 -0700
----------------------------------------------------------------------
.../ml/clustering/BisectingKMeansSuite.scala | 41 ++++++++++----------
.../ml/clustering/GaussianMixtureSuite.scala | 22 ++++-------
.../spark/ml/clustering/KMeansSuite.scala | 31 +++++++--------
.../apache/spark/ml/clustering/LDASuite.scala | 21 ++++------
4 files changed, 50 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d4a0895c/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index f3ff2af..81842af 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -19,17 +19,18 @@ package org.apache.spark.ml.clustering
import scala.language.existentials
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.clustering.DistanceMeasure
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.Dataset
-class BisectingKMeansSuite
- extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+
+class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
+
+ import testImplicits._
final val k = 5
@transient var dataset: Dataset[_] = _
@@ -68,10 +69,13 @@ class BisectingKMeansSuite
// Verify fit does not fail on very sparse data
val model = bkm.fit(sparseDataset)
- val result = model.transform(sparseDataset)
- val numClusters = result.select("prediction").distinct().collect().length
- // Verify we hit the edge case
- assert(numClusters < k && numClusters > 1)
+
+ testTransformerByGlobalCheckFunc[Tuple1[Vector]](sparseDataset.toDF(), model, "prediction") {
+ rows =>
+ val numClusters = rows.distinct.length
+ // Verify we hit the edge case
+ assert(numClusters < k && numClusters > 1)
+ }
}
test("setter/getter") {
@@ -104,19 +108,16 @@ class BisectingKMeansSuite
val bkm = new BisectingKMeans().setK(k).setPredictionCol(predictionColName).setSeed(1)
val model = bkm.fit(dataset)
assert(model.clusterCenters.length === k)
-
- val transformed = model.transform(dataset)
- val expectedColumns = Array("features", predictionColName)
- expectedColumns.foreach { column =>
- assert(transformed.columns.contains(column))
- }
- val clusters =
- transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
- assert(clusters.size === k)
- assert(clusters === Set(0, 1, 2, 3, 4))
assert(model.computeCost(dataset) < 0.1)
assert(model.hasParent)
+ testTransformerByGlobalCheckFunc[Tuple1[Vector]](dataset.toDF(), model,
+ "features", predictionColName) { rows =>
+ val clusters = rows.map(_.getAs[Int](predictionColName)).toSet
+ assert(clusters.size === k)
+ assert(clusters === Set(0, 1, 2, 3, 4))
+ }
+
// Check validity of model summary
val numRows = dataset.count()
assert(model.hasSummary)
http://git-wip-us.apache.org/repos/asf/spark/blob/d4a0895c/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
index d0d461a..0b91f50 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
@@ -23,16 +23,15 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.{Dataset, Row}
-class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
- with DefaultReadWriteTest {
- import testImplicits._
+class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
+
import GaussianMixtureSuite._
+ import testImplicits._
final val k = 5
private val seed = 538009335
@@ -119,15 +118,10 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
assert(model.weights.length === k)
assert(model.gaussians.length === k)
- val transformed = model.transform(dataset)
- val expectedColumns = Array("features", predictionColName, probabilityColName)
- expectedColumns.foreach { column =>
- assert(transformed.columns.contains(column))
- }
-
// Check prediction matches the highest probability, and probabilities sum to one.
- transformed.select(predictionColName, probabilityColName).collect().foreach {
- case Row(pred: Int, prob: Vector) =>
+ testTransformer[Tuple1[Vector]](dataset.toDF(), model,
+ "features", predictionColName, probabilityColName) {
+ case Row(_, pred: Int, prob: Vector) =>
val probArray = prob.toArray
val predFromProb = probArray.zipWithIndex.maxBy(_._1)._2
assert(pred === predFromProb)
http://git-wip-us.apache.org/repos/asf/spark/blob/d4a0895c/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index 680a7c2..2569e7a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -22,20 +22,21 @@ import scala.util.Random
import org.dmg.pmml.{ClusteringModel, PMML}
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
-import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils, PMMLReadWriteTest}
import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
+import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans,
+ KMeansModel => MLlibKMeansModel}
import org.apache.spark.mllib.linalg.{Vectors => MLlibVectors}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
private[clustering] case class TestRow(features: Vector)
-class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest
- with PMMLReadWriteTest {
+class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTest {
+
+ import testImplicits._
final val k = 5
@transient var dataset: Dataset[_] = _
@@ -109,15 +110,13 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR
val model = kmeans.fit(dataset)
assert(model.clusterCenters.length === k)
- val transformed = model.transform(dataset)
- val expectedColumns = Array("features", predictionColName)
- expectedColumns.foreach { column =>
- assert(transformed.columns.contains(column))
+ testTransformerByGlobalCheckFunc[Tuple1[Vector]](dataset.toDF(), model,
+ "features", predictionColName) { rows =>
+ val clusters = rows.map(_.getAs[Int](predictionColName)).toSet
+ assert(clusters.size === k)
+ assert(clusters === Set(0, 1, 2, 3, 4))
}
- val clusters =
- transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
- assert(clusters.size === k)
- assert(clusters === Set(0, 1, 2, 3, 4))
+
assert(model.computeCost(dataset) < 0.1)
assert(model.hasParent)
@@ -149,9 +148,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR
model.setFeaturesCol(featuresColName).setPredictionCol(predictionColName)
val transformed = model.transform(dataset.withColumnRenamed("features", featuresColName))
- Seq(featuresColName, predictionColName).foreach { column =>
- assert(transformed.columns.contains(column))
- }
+ assert(transformed.schema.fieldNames.toSet === Set(featuresColName, predictionColName))
assert(model.getFeaturesCol == featuresColName)
assert(model.getPredictionCol == predictionColName)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4a0895c/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index 4d84820..096b541 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -21,11 +21,9 @@ import scala.language.existentials
import org.apache.hadoop.fs.Path
-import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql._
object LDASuite {
@@ -61,7 +59,7 @@ object LDASuite {
}
-class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class LDASuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
@@ -186,16 +184,11 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
assert(model.topicsMatrix.numCols === k)
assert(!model.isDistributed)
- // transform()
- val transformed = model.transform(dataset)
- val expectedColumns = Array("features", lda.getTopicDistributionCol)
- expectedColumns.foreach { column =>
- assert(transformed.columns.contains(column))
- }
- transformed.select(lda.getTopicDistributionCol).collect().foreach { r =>
- val topicDistribution = r.getAs[Vector](0)
- assert(topicDistribution.size === k)
- assert(topicDistribution.toArray.forall(w => w >= 0.0 && w <= 1.0))
+ testTransformer[Tuple1[Vector]](dataset.toDF(), model,
+ "features", lda.getTopicDistributionCol) {
+ case Row(_, topicDistribution: Vector) =>
+ assert(topicDistribution.size === k)
+ assert(topicDistribution.toArray.forall(w => w >= 0.0 && w <= 1.0))
}
// logLikelihood, logPerplexity
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org