You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2018/04/11 16:59:44 UTC
spark git commit: [SPARK-22883] ML test for StructuredStreaming:
spark.ml.feature, I-M
Repository: spark
Updated Branches:
refs/heads/master 3cb82047f -> 75a183071
[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M
## What changes were proposed in this pull request?
Adds structured streaming tests using testTransformer for these suites:
* IDF
* Imputer
* Interaction
* MaxAbsScaler
* MinHashLSH
* MinMaxScaler
* NGram
## How was this patch tested?
It is a bunch of tests!
Author: Joseph K. Bradley <jo...@databricks.com>
Closes #20964 from jkbradley/SPARK-22883-part2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75a18307
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75a18307
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75a18307
Branch: refs/heads/master
Commit: 75a183071c4ed2e407c930edfdf721779662b3ee
Parents: 3cb8204
Author: Joseph K. Bradley <jo...@databricks.com>
Authored: Wed Apr 11 09:59:38 2018 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Wed Apr 11 09:59:38 2018 -0700
----------------------------------------------------------------------
.../org/apache/spark/ml/feature/IDFSuite.scala | 14 +++---
.../apache/spark/ml/feature/ImputerSuite.scala | 31 ++++++++++---
.../spark/ml/feature/InteractionSuite.scala | 46 ++++++++++----------
.../spark/ml/feature/MaxAbsScalerSuite.scala | 14 +++---
.../spark/ml/feature/MinHashLSHSuite.scala | 25 ++++++++---
.../spark/ml/feature/MinMaxScalerSuite.scala | 14 +++---
.../apache/spark/ml/feature/NGramSuite.scala | 2 +-
7 files changed, 89 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
index 005edf7..cdd62be 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
@@ -17,17 +17,15 @@
package org.apache.spark.ml.feature
-import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel}
import org.apache.spark.mllib.linalg.VectorImplicits._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Row
-class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class IDFSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
@@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
Vectors.dense(0.0, 1.0, 2.0, 3.0),
Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
)
- val numOfData = data.size
+ val numOfData = data.length
val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
math.log((numOfData + 1.0) / (x + 1.0))
})
@@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
MLTestingUtils.checkCopyAndUids(idfEst, idfModel)
- idfModel.transform(df).select("idfValue", "expected").collect().foreach {
+ testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
case Row(x: Vector, y: Vector) =>
assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
}
@@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
Vectors.dense(0.0, 1.0, 2.0, 3.0),
Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
)
- val numOfData = data.size
+ val numOfData = data.length
val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0
})
@@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
.setMinDocFreq(1)
.fit(df)
- idfModel.transform(df).select("idfValue", "expected").collect().foreach {
+ testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
case Row(x: Vector, y: Vector) =>
assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
index c08b35b..75f63a6 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
@@ -16,13 +16,12 @@
*/
package org.apache.spark.ml.feature
-import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.ml.util.DefaultReadWriteTest
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.SparkException
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.sql.{DataFrame, Row}
-class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class ImputerSuite extends MLTest with DefaultReadWriteTest {
test("Imputer for Double with default missing Value NaN") {
val df = spark.createDataFrame( Seq(
@@ -76,6 +75,28 @@ class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with Default
ImputerSuite.iterateStrategyTest(imputer, df)
}
+ test("Imputer should work with Structured Streaming") {
+ val localSpark = spark
+ import localSpark.implicits._
+ val df = Seq[(java.lang.Double, Double)](
+ (4.0, 4.0),
+ (10.0, 10.0),
+ (10.0, 10.0),
+ (Double.NaN, 8.0),
+ (null, 8.0)
+ ).toDF("value", "expected_mean_value")
+ val imputer = new Imputer()
+ .setInputCols(Array("value"))
+ .setOutputCols(Array("out"))
+ .setStrategy("mean")
+ val model = imputer.fit(df)
+ testTransformer[(java.lang.Double, Double)](df, model, "expected_mean_value", "out") {
+ case Row(exp: java.lang.Double, out: Double) =>
+ assert((exp.isNaN && out.isNaN) || (exp == out),
+ s"Imputed values differ. Expected: $exp, actual: $out")
+ }
+ }
+
test("Imputer throws exception when surrogate cannot be computed") {
val df = spark.createDataFrame( Seq(
(0, Double.NaN, 1.0, 1.0),
@@ -164,8 +185,6 @@ object ImputerSuite {
* @param df DataFrame with columns "id", "value", "expected_mean", "expected_median"
*/
def iterateStrategyTest(imputer: Imputer, df: DataFrame): Unit = {
- val inputCols = imputer.getInputCols
-
Seq("mean", "median").foreach { strategy =>
imputer.setStrategy(strategy)
val model = imputer.fit(df)
http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala
index 54f059e..eea31fc 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala
@@ -19,15 +19,15 @@ package org.apache.spark.ml.feature
import scala.collection.mutable.ArrayBuilder
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.DefaultReadWriteTest
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
-class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class InteractionSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
@@ -63,9 +63,9 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
test("numeric interaction") {
val data = Seq(
- (2, Vectors.dense(3.0, 4.0)),
- (1, Vectors.dense(1.0, 5.0))
- ).toDF("a", "b")
+ (2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
+ (1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
+ ).toDF("a", "b", "expected")
val groupAttr = new AttributeGroup(
"b",
Array[Attribute](
@@ -73,14 +73,15 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
NumericAttribute.defaultAttr.withName("bar")))
val df = data.select(
col("a").as("a", NumericAttribute.defaultAttr.toMetadata()),
- col("b").as("b", groupAttr.toMetadata()))
+ col("b").as("b", groupAttr.toMetadata()),
+ col("expected"))
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
+ testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
+ case Row(features: Vector, expected: Vector) =>
+ assert(features === expected)
+ }
+
val res = trans.transform(df)
- val expected = Seq(
- (2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
- (1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
- ).toDF("a", "b", "features")
- assert(res.collect() === expected.collect())
val attrs = AttributeGroup.fromStructField(res.schema("features"))
val expectedAttrs = new AttributeGroup(
"features",
@@ -92,9 +93,9 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
test("nominal interaction") {
val data = Seq(
- (2, Vectors.dense(3.0, 4.0)),
- (1, Vectors.dense(1.0, 5.0))
- ).toDF("a", "b")
+ (2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
+ (1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
+ ).toDF("a", "b", "expected")
val groupAttr = new AttributeGroup(
"b",
Array[Attribute](
@@ -103,14 +104,15 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
val df = data.select(
col("a").as(
"a", NominalAttribute.defaultAttr.withValues(Array("up", "down", "left")).toMetadata()),
- col("b").as("b", groupAttr.toMetadata()))
+ col("b").as("b", groupAttr.toMetadata()),
+ col("expected"))
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
+ testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
+ case Row(features: Vector, expected: Vector) =>
+ assert(features === expected)
+ }
+
val res = trans.transform(df)
- val expected = Seq(
- (2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
- (1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
- ).toDF("a", "b", "features")
- assert(res.collect() === expected.collect())
val attrs = AttributeGroup.fromStructField(res.schema("features"))
val expectedAttrs = new AttributeGroup(
"features",
http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
index 918da4f..8dd0f0c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.ml.feature
-import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.Row
-class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class MaxAbsScalerSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
@@ -45,9 +44,10 @@ class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
.setOutputCol("scaled")
val model = scaler.fit(df)
- model.transform(df).select("expected", "scaled").collect()
- .foreach { case Row(vector1: Vector, vector2: Vector) =>
- assert(vector1.equals(vector2), s"MaxAbsScaler ut error: $vector2 should be $vector1")
+ testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
+ case Row(expectedVec: Vector, actualVec: Vector) =>
+ assert(expectedVec === actualVec,
+ s"MaxAbsScaler error: Expected $expectedVec but computed $actualVec")
}
MLTestingUtils.checkCopyAndUids(scaler, model)
http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala
index 3da0fb7..1c2956c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala
@@ -17,14 +17,13 @@
package org.apache.spark.ml.feature
-import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Dataset
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
+import org.apache.spark.sql.{Dataset, Row}
-class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+
+class MinHashLSHSuite extends MLTest with DefaultReadWriteTest {
@transient var dataset: Dataset[_] = _
@@ -175,4 +174,20 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
assert(precision == 1.0)
assert(recall >= 0.7)
}
+
+ test("MinHashLSHModel.transform should work with Structured Streaming") {
+ val localSpark = spark
+ import localSpark.implicits._
+
+ val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0)))
+ model.set(model.inputCol, "keys")
+ testTransformer[Tuple1[Vector]](dataset.toDF(), model, "keys", model.getOutputCol) {
+ case Row(_: Vector, output: Seq[_]) =>
+ assert(output.length === model.randCoefficients.length)
+ // no AND-amplification yet: SPARK-18450, so each hash output is of length 1
+ output.foreach {
+ case hashOutput: Vector => assert(hashOutput.size === 1)
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala
index 51db74e..2d965f2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala
@@ -17,13 +17,11 @@
package org.apache.spark.ml.feature
-import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.Row
-class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class MinMaxScalerSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
@@ -48,9 +46,9 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
.setMax(5)
val model = scaler.fit(df)
- model.transform(df).select("expected", "scaled").collect()
- .foreach { case Row(vector1: Vector, vector2: Vector) =>
- assert(vector1.equals(vector2), "Transformed vector is different with expected.")
+ testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
+ case Row(vector1: Vector, vector2: Vector) =>
+ assert(vector1 === vector2, "Transformed vector is different with expected.")
}
MLTestingUtils.checkCopyAndUids(scaler, model)
@@ -114,7 +112,7 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
val model = scaler.fit(df)
model.transform(df).select("expected", "scaled").collect()
.foreach { case Row(vector1: Vector, vector2: Vector) =>
- assert(vector1.equals(vector2), "Transformed vector is different with expected.")
+ assert(vector1 === vector2, "Transformed vector is different with expected.")
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
index e5956ee..201a335 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
@@ -84,7 +84,7 @@ class NGramSuite extends MLTest with DefaultReadWriteTest {
def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", "wantedNGrams") {
- case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) =>
+ case Row(actualNGrams : Seq[_], wantedNGrams: Seq[_]) =>
assert(actualNGrams === wantedNGrams)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org