You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yl...@apache.org on 2016/12/28 15:01:38 UTC

spark git commit: [SPARK-17772][ML][TEST] Add test functions for ML sample weights

Repository: spark
Updated Branches:
  refs/heads/master d7bce3bd3 -> 6a475ae46


[SPARK-17772][ML][TEST] Add test functions for ML sample weights

## What changes were proposed in this pull request?

More and more ML algos are accepting sample weights, and they have been tested rather heterogeneously and with code duplication. This patch adds extensible helper methods to `MLTestingUtils` that can be reused by various algorithms accepting sample weights. Up to now, there seems to be a few tests that have been implemented commonly:

* Check that oversampling is the same as giving the instances sample weights proportional to the number of samples
* Check that outliers with tiny sample weights do not affect the algorithm's performance

This patch adds an additional test:

* Check that algorithms are invariant to constant scaling of the sample weights. i.e. uniform sample weights with `w_i = 1.0` is effectively the same as uniform sample weights with `w_i = 10000` or `w_i = 0.0001`

The instances of these tests occurred in LinearRegression, NaiveBayes, and LogisticRegression. Those tests have been removed/modified to use the new helper methods. These helper functions will be of use when [SPARK-9478](https://issues.apache.org/jira/browse/SPARK-9478) is implemented.

## How was this patch tested?

This patch only involves modifying test suites.

## Other notes

Both IsotonicRegression and GeneralizedLinearRegression also extend `HasWeightCol`. I did not modify these test suites because it will make this patch easier to review, and because they did not duplicate the same tests as the three suites that were modified. If we want to change them later, we can create a JIRA for it now, but it's open for debate.

Author: sethah <se...@gmail.com>

Closes #15721 from sethah/SPARK-17772.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a475ae4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a475ae4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a475ae4

Branch: refs/heads/master
Commit: 6a475ae466a7ce28d507244bf6db91be06ed81ef
Parents: d7bce3b
Author: sethah <se...@gmail.com>
Authored: Wed Dec 28 07:01:14 2016 -0800
Committer: Yanbo Liang <yb...@gmail.com>
Committed: Wed Dec 28 07:01:14 2016 -0800

----------------------------------------------------------------------
 .../LogisticRegressionSuite.scala               |  60 +++-------
 .../ml/classification/NaiveBayesSuite.scala     |  81 +++++--------
 .../ml/regression/LinearRegressionSuite.scala   | 120 ++++++-------------
 .../apache/spark/ml/util/MLTestingUtils.scala   | 111 +++++++++++------
 4 files changed, 154 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6a475ae4/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index f8bcbee..1308210 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -1836,52 +1836,24 @@ class LogisticRegressionSuite
         .forall(x => x(0) >= x(1)))
   }
 
-  test("binary logistic regression with weighted data") {
-    val numClasses = 2
-    val numPoints = 40
-    val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark,
-      numClasses, numPoints)
-    val testData = Array.tabulate[LabeledPoint](numClasses) { i =>
-      LabeledPoint(i.toDouble, Vectors.dense(i.toDouble))
-    }.toSeq.toDF()
-    val lr = new LogisticRegression().setFamily("binomial").setWeightCol("weight")
-    val model = lr.fit(outlierData)
-    val results = model.transform(testData).select("label", "prediction").collect()
-
-    // check that the predictions are the one to one mapping
-    results.foreach { case Row(label: Double, pred: Double) =>
-      assert(label === pred)
+  test("logistic regression with sample weights") {
+    def modelEquals(m1: LogisticRegressionModel, m2: LogisticRegressionModel): Unit = {
+      assert(m1.coefficientMatrix ~== m2.coefficientMatrix absTol 0.05)
+      assert(m1.interceptVector ~== m2.interceptVector absTol 0.05)
     }
-    val (overSampledData, weightedData) =
-      MLTestingUtils.genEquivalentOversampledAndWeightedInstances(outlierData, "label", "features",
-        42L)
-    val weightedModel = lr.fit(weightedData)
-    val overSampledModel = lr.setWeightCol("").fit(overSampledData)
-    assert(weightedModel.coefficientMatrix ~== overSampledModel.coefficientMatrix relTol 0.01)
-  }
-
-  test("multinomial logistic regression with weighted data") {
-    val numClasses = 5
-    val numPoints = 40
-    val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark,
-      numClasses, numPoints)
-    val testData = Array.tabulate[LabeledPoint](numClasses) { i =>
-      LabeledPoint(i.toDouble, Vectors.dense(i.toDouble))
-    }.toSeq.toDF()
-    val mlr = new LogisticRegression().setFamily("multinomial").setWeightCol("weight")
-    val model = mlr.fit(outlierData)
-    val results = model.transform(testData).select("label", "prediction").collect()
-
-    // check that the predictions are the one to one mapping
-    results.foreach { case Row(label: Double, pred: Double) =>
-      assert(label === pred)
+    val testParams = Seq(
+      ("binomial", smallBinaryDataset, 2),
+      ("multinomial", smallMultinomialDataset, 3)
+    )
+    testParams.foreach { case (family, dataset, numClasses) =>
+      val estimator = new LogisticRegression().setFamily(family)
+      MLTestingUtils.testArbitrarilyScaledWeights[LogisticRegressionModel, LogisticRegression](
+        dataset.as[LabeledPoint], estimator, modelEquals)
+      MLTestingUtils.testOutliersWithSmallWeights[LogisticRegressionModel, LogisticRegression](
+        dataset.as[LabeledPoint], estimator, numClasses, modelEquals)
+      MLTestingUtils.testOversamplingVsWeighting[LogisticRegressionModel, LogisticRegression](
+        dataset.as[LabeledPoint], estimator, modelEquals, seed)
     }
-    val (overSampledData, weightedData) =
-      MLTestingUtils.genEquivalentOversampledAndWeightedInstances(outlierData, "label", "features",
-        42L)
-    val weightedModel = mlr.fit(weightedData)
-    val overSampledModel = mlr.setWeightCol("").fit(overSampledData)
-    assert(weightedModel.coefficientMatrix ~== overSampledModel.coefficientMatrix relTol 0.01)
   }
 
   test("set family") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6a475ae4/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
index e934e5e..2a69ef1 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
@@ -38,18 +38,22 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
   import testImplicits._
 
   @transient var dataset: Dataset[_] = _
+  @transient var bernoulliDataset: Dataset[_] = _
+
+  private val seed = 42
 
   override def beforeAll(): Unit = {
     super.beforeAll()
 
-    val pi = Array(0.5, 0.1, 0.4).map(math.log)
+    val pi = Array(0.3, 0.3, 0.4).map(math.log)
     val theta = Array(
-      Array(0.70, 0.10, 0.10, 0.10), // label 0
-      Array(0.10, 0.70, 0.10, 0.10), // label 1
-      Array(0.10, 0.10, 0.70, 0.10)  // label 2
+      Array(0.30, 0.30, 0.30, 0.30), // label 0
+      Array(0.30, 0.30, 0.30, 0.30), // label 1
+      Array(0.40, 0.40, 0.40, 0.40)  // label 2
     ).map(_.map(math.log))
 
-    dataset = generateNaiveBayesInput(pi, theta, 100, 42).toDF()
+    dataset = generateNaiveBayesInput(pi, theta, 100, seed).toDF()
+    bernoulliDataset = generateNaiveBayesInput(pi, theta, 100, seed, "bernoulli").toDF()
   }
 
   def validatePrediction(predictionAndLabels: DataFrame): Unit = {
@@ -139,7 +143,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
     val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)
 
     val testDataset =
-      generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF()
+      generateNaiveBayesInput(piArray, thetaArray, nPoints, seed, "multinomial").toDF()
     val nb = new NaiveBayes().setSmoothing(1.0).setModelType("multinomial")
     val model = nb.fit(testDataset)
 
@@ -157,50 +161,27 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
     validateProbabilities(featureAndProbabilities, model, "multinomial")
   }
 
-  test("Naive Bayes Multinomial with weighted samples") {
-    val nPoints = 1000
-    val piArray = Array(0.5, 0.1, 0.4).map(math.log)
-    val thetaArray = Array(
-      Array(0.70, 0.10, 0.10, 0.10), // label 0
-      Array(0.10, 0.70, 0.10, 0.10), // label 1
-      Array(0.10, 0.10, 0.70, 0.10) // label 2
-    ).map(_.map(math.log))
-
-    val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF()
-    val (overSampledData, weightedData) =
-      MLTestingUtils.genEquivalentOversampledAndWeightedInstances(testData,
-        "label", "features", 42L)
-    val nb = new NaiveBayes().setModelType("multinomial")
-    val unweightedModel = nb.fit(weightedData)
-    val overSampledModel = nb.fit(overSampledData)
-    val weightedModel = nb.setWeightCol("weight").fit(weightedData)
-    assert(weightedModel.theta ~== overSampledModel.theta relTol 0.001)
-    assert(weightedModel.pi ~== overSampledModel.pi relTol 0.001)
-    assert(unweightedModel.theta !~= overSampledModel.theta relTol 0.001)
-    assert(unweightedModel.pi !~= overSampledModel.pi relTol 0.001)
-  }
-
-  test("Naive Bayes Bernoulli with weighted samples") {
-    val nPoints = 10000
-    val piArray = Array(0.5, 0.3, 0.2).map(math.log)
-    val thetaArray = Array(
-      Array(0.50, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.40), // label 0
-      Array(0.02, 0.70, 0.10, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02), // label 1
-      Array(0.02, 0.02, 0.60, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.30)  // label 2
-    ).map(_.map(math.log))
-
-    val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "bernoulli").toDF()
-    val (overSampledData, weightedData) =
-      MLTestingUtils.genEquivalentOversampledAndWeightedInstances(testData,
-        "label", "features", 42L)
-    val nb = new NaiveBayes().setModelType("bernoulli")
-    val unweightedModel = nb.fit(weightedData)
-    val overSampledModel = nb.fit(overSampledData)
-    val weightedModel = nb.setWeightCol("weight").fit(weightedData)
-    assert(weightedModel.theta ~== overSampledModel.theta relTol 0.001)
-    assert(weightedModel.pi ~== overSampledModel.pi relTol 0.001)
-    assert(unweightedModel.theta !~= overSampledModel.theta relTol 0.001)
-    assert(unweightedModel.pi !~= overSampledModel.pi relTol 0.001)
+  test("Naive Bayes with weighted samples") {
+    val numClasses = 3
+    def modelEquals(m1: NaiveBayesModel, m2: NaiveBayesModel): Unit = {
+      assert(m1.pi ~== m2.pi relTol 0.01)
+      assert(m1.theta ~== m2.theta relTol 0.01)
+    }
+    val testParams = Seq(
+      ("bernoulli", bernoulliDataset),
+      ("multinomial", dataset)
+    )
+    testParams.foreach { case (family, dataset) =>
+      // NaiveBayes is sensitive to constant scaling of the weights unless smoothing is set to 0
+      val estimatorNoSmoothing = new NaiveBayes().setSmoothing(0.0).setModelType(family)
+      val estimatorWithSmoothing = new NaiveBayes().setModelType(family)
+      MLTestingUtils.testArbitrarilyScaledWeights[NaiveBayesModel, NaiveBayes](
+        dataset.as[LabeledPoint], estimatorNoSmoothing, modelEquals)
+      MLTestingUtils.testOutliersWithSmallWeights[NaiveBayesModel, NaiveBayes](
+        dataset.as[LabeledPoint], estimatorWithSmoothing, numClasses, modelEquals)
+      MLTestingUtils.testOversamplingVsWeighting[NaiveBayesModel, NaiveBayes](
+        dataset.as[LabeledPoint], estimatorWithSmoothing, modelEquals, seed)
+    }
   }
 
   test("Naive Bayes Bernoulli") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6a475ae4/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
index 0be8274..e05d0c9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
@@ -36,6 +36,7 @@ class LinearRegressionSuite
 
   private val seed: Int = 42
   @transient var datasetWithDenseFeature: DataFrame = _
+  @transient var datasetWithStrongNoise: DataFrame = _
   @transient var datasetWithDenseFeatureWithoutIntercept: DataFrame = _
   @transient var datasetWithSparseFeature: DataFrame = _
   @transient var datasetWithWeight: DataFrame = _
@@ -47,6 +48,11 @@ class LinearRegressionSuite
     datasetWithDenseFeature = sc.parallelize(LinearDataGenerator.generateLinearInput(
       intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
       xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML).toDF()
+
+    datasetWithStrongNoise = sc.parallelize(LinearDataGenerator.generateLinearInput(
+      intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
+      xVariance = Array(0.7, 1.2), nPoints = 100, seed, eps = 5.0), 2).map(_.asML).toDF()
+
     /*
        datasetWithDenseFeatureWithoutIntercept is not needed for correctness testing
        but is useful for illustrating training model without intercept
@@ -95,6 +101,7 @@ class LinearRegressionSuite
       Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)),
       Instance(17.0, 4.0, Vectors.dense(3.0, 13.0))
     ), 2).toDF()
+
     datasetWithWeightZeroLabel = sc.parallelize(Seq(
       Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
       Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)),
@@ -810,91 +817,34 @@ class LinearRegressionSuite
   }
 
   test("linear regression with weighted samples") {
-    Seq("auto", "l-bfgs", "normal").foreach { solver =>
-      val (data, weightedData) = {
-        val activeData = LinearDataGenerator.generateLinearInput(
-          6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 500, 1, 0.1).map(_.asML)
-
-        val rnd = new Random(8392)
-        val signedData = activeData.map { case p: LabeledPoint =>
-          (rnd.nextGaussian() > 0.0, p)
-        }
-
-        val data1 = signedData.flatMap {
-          case (true, p) => Iterator(p, p)
-          case (false, p) => Iterator(p)
-        }
-
-        val weightedSignedData = signedData.flatMap {
-          case (true, LabeledPoint(label, features)) =>
-            Iterator(
-              Instance(label, weight = 1.2, features),
-              Instance(label, weight = 0.8, features)
-            )
-          case (false, LabeledPoint(label, features)) =>
-            Iterator(
-              Instance(label, weight = 0.3, features),
-              Instance(label, weight = 0.1, features),
-              Instance(label, weight = 0.6, features)
-            )
-        }
-
-        val noiseData = LinearDataGenerator.generateLinearInput(
-          2, Array(1, 3), Array(0.9, -1.3), Array(0.7, 1.2), 500, 1, 0.1).map(_.asML)
-        val weightedNoiseData = noiseData.map {
-          case LabeledPoint(label, features) => Instance(label, weight = 0, features)
-        }
-        val data2 = weightedSignedData ++ weightedNoiseData
-
-        (sc.parallelize(data1, 4).toDF(), sc.parallelize(data2, 4).toDF())
-      }
-
-      val trainer1a = (new LinearRegression).setFitIntercept(true)
-        .setElasticNetParam(0.0).setRegParam(0.21).setStandardization(true).setSolver(solver)
-      val trainer1b = (new LinearRegression).setFitIntercept(true).setWeightCol("weight")
-        .setElasticNetParam(0.0).setRegParam(0.21).setStandardization(true).setSolver(solver)
-
-      // Normal optimizer is not supported with non-zero elasticnet parameter.
-      val model1a0 = trainer1a.fit(data)
-      val model1a1 = trainer1a.fit(weightedData)
-      val model1b = trainer1b.fit(weightedData)
-
-      assert(model1a0.coefficients !~= model1a1.coefficients absTol 1E-3)
-      assert(model1a0.intercept !~= model1a1.intercept absTol 1E-3)
-      assert(model1a0.coefficients ~== model1b.coefficients absTol 1E-3)
-      assert(model1a0.intercept ~== model1b.intercept absTol 1E-3)
-
-      val trainer2a = (new LinearRegression).setFitIntercept(true)
-        .setElasticNetParam(0.0).setRegParam(0.21).setStandardization(false).setSolver(solver)
-      val trainer2b = (new LinearRegression).setFitIntercept(true).setWeightCol("weight")
-        .setElasticNetParam(0.0).setRegParam(0.21).setStandardization(false).setSolver(solver)
-      val model2a0 = trainer2a.fit(data)
-      val model2a1 = trainer2a.fit(weightedData)
-      val model2b = trainer2b.fit(weightedData)
-      assert(model2a0.coefficients !~= model2a1.coefficients absTol 1E-3)
-      assert(model2a0.intercept !~= model2a1.intercept absTol 1E-3)
-      assert(model2a0.coefficients ~== model2b.coefficients absTol 1E-3)
-      assert(model2a0.intercept ~== model2b.intercept absTol 1E-3)
-
-      val trainer3a = (new LinearRegression).setFitIntercept(false)
-        .setElasticNetParam(0.0).setRegParam(0.21).setStandardization(true).setSolver(solver)
-      val trainer3b = (new LinearRegression).setFitIntercept(false).setWeightCol("weight")
-        .setElasticNetParam(0.0).setRegParam(0.21).setStandardization(true).setSolver(solver)
-      val model3a0 = trainer3a.fit(data)
-      val model3a1 = trainer3a.fit(weightedData)
-      val model3b = trainer3b.fit(weightedData)
-      assert(model3a0.coefficients !~= model3a1.coefficients absTol 1E-3)
-      assert(model3a0.coefficients ~== model3b.coefficients absTol 1E-3)
-
-      val trainer4a = (new LinearRegression).setFitIntercept(false)
-        .setElasticNetParam(0.0).setRegParam(0.21).setStandardization(false).setSolver(solver)
-      val trainer4b = (new LinearRegression).setFitIntercept(false).setWeightCol("weight")
-        .setElasticNetParam(0.0).setRegParam(0.21).setStandardization(false).setSolver(solver)
-      val model4a0 = trainer4a.fit(data)
-      val model4a1 = trainer4a.fit(weightedData)
-      val model4b = trainer4b.fit(weightedData)
-      assert(model4a0.coefficients !~= model4a1.coefficients absTol 1E-3)
-      assert(model4a0.coefficients ~== model4b.coefficients absTol 1E-3)
+    val sqlContext = spark.sqlContext
+    import sqlContext.implicits._
+    val numClasses = 0
+    def modelEquals(m1: LinearRegressionModel, m2: LinearRegressionModel): Unit = {
+      assert(m1.coefficients ~== m2.coefficients relTol 0.01)
+      assert(m1.intercept ~== m2.intercept relTol 0.01)
+    }
+    val testParams = Seq(
+      // (elasticNetParam, regParam, fitIntercept, standardization)
+      (0.0, 0.21, true, true),
+      (0.0, 0.21, true, false),
+      (0.0, 0.21, false, false),
+      (1.0, 0.21, true, true)
+    )
+
+    for (solver <- Seq("auto", "l-bfgs", "normal");
+         (elasticNetParam, regParam, fitIntercept, standardization) <- testParams) {
+      val estimator = new LinearRegression()
+        .setFitIntercept(fitIntercept)
+        .setStandardization(standardization)
+        .setRegParam(regParam)
+        .setElasticNetParam(elasticNetParam)
+      MLTestingUtils.testArbitrarilyScaledWeights[LinearRegressionModel, LinearRegression](
+        datasetWithStrongNoise.as[LabeledPoint], estimator, modelEquals)
+      MLTestingUtils.testOutliersWithSmallWeights[LinearRegressionModel, LinearRegression](
+        datasetWithStrongNoise.as[LabeledPoint], estimator, numClasses, modelEquals)
+      MLTestingUtils.testOversamplingVsWeighting[LinearRegressionModel, LinearRegression](
+        datasetWithStrongNoise.as[LabeledPoint], estimator, modelEquals, seed)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6a475ae4/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
index 472a5af..d219c42 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
@@ -18,15 +18,15 @@
 package org.apache.spark.ml.util
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.{Estimator, Model}
-import org.apache.spark.ml.attribute.NominalAttribute
+import org.apache.spark.ml._
 import org.apache.spark.ml.evaluation.Evaluator
-import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.feature.{Instance, LabeledPoint}
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasWeightCol}
 import org.apache.spark.ml.recommendation.{ALS, ALSModel}
 import org.apache.spark.ml.tree.impl.TreeTests
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
@@ -182,46 +182,79 @@ object MLTestingUtils extends SparkFunSuite {
       .toMap
   }
 
-  def genClassificationInstancesWithWeightedOutliers(
-      spark: SparkSession,
-      numClasses: Int,
-      numInstances: Int): DataFrame = {
-    val data = Array.tabulate[Instance](numInstances) { i =>
-      val feature = i % numClasses
-      if (i < numInstances / 3) {
-        // give large weights to minority of data with 1 to 1 mapping feature to label
-        Instance(feature, 1.0, Vectors.dense(feature))
-      } else {
-        // give small weights to majority of data points with reverse mapping
-        Instance(numClasses - feature - 1, 0.01, Vectors.dense(feature))
-      }
-    }
-    val labelMeta =
-      NominalAttribute.defaultAttr.withName("label").withNumValues(numClasses).toMetadata()
-    spark.createDataFrame(data).select(col("label").as("label", labelMeta), col("weight"),
-      col("features"))
-  }
-
+  /**
+   * Given a DataFrame, generate two output DataFrames: one having the original rows oversampled
+   * an integer number of times, and one having the original rows but with a column of weights
+   * proportional to the number of oversampled instances in the oversampled DataFrames.
+   */
   def genEquivalentOversampledAndWeightedInstances(
-      data: DataFrame,
-      labelCol: String,
-      featuresCol: String,
-      seed: Long): (DataFrame, DataFrame) = {
+      data: Dataset[LabeledPoint],
+      seed: Long): (Dataset[Instance], Dataset[Instance]) = {
     import data.sparkSession.implicits._
-    val rng = scala.util.Random
-    rng.setSeed(seed)
+    val rng = new scala.util.Random(seed)
     val sample: () => Int = () => rng.nextInt(10) + 1
     val sampleUDF = udf(sample)
-    val rawData = data.select(labelCol, featuresCol).withColumn("samples", sampleUDF())
-    val overSampledData = rawData.rdd.flatMap {
-      case Row(label: Double, features: Vector, n: Int) =>
-        Iterator.fill(n)(Instance(label, 1.0, features))
-    }.toDF()
+    val rawData = data.select("label", "features").withColumn("samples", sampleUDF())
+    val overSampledData = rawData.rdd.flatMap { case Row(label: Double, features: Vector, n: Int) =>
+      Iterator.fill(n)(Instance(label, 1.0, features))
+    }.toDS()
     rng.setSeed(seed)
-    val weightedData = rawData.rdd.map {
-      case Row(label: Double, features: Vector, n: Int) =>
-        Instance(label, n.toDouble, features)
-    }.toDF()
+    val weightedData = rawData.rdd.map { case Row(label: Double, features: Vector, n: Int) =>
+      Instance(label, n.toDouble, features)
+    }.toDS()
     (overSampledData, weightedData)
   }
+
+  /**
+   * Helper function for testing sample weights. Tests that oversampling each point is equivalent
+   * to assigning a sample weight proportional to the number of samples for each point.
+   */
+  def testOversamplingVsWeighting[M <: Model[M], E <: Estimator[M]](
+      data: Dataset[LabeledPoint],
+      estimator: E with HasWeightCol,
+      modelEquals: (M, M) => Unit,
+      seed: Long): Unit = {
+    val (overSampledData, weightedData) = genEquivalentOversampledAndWeightedInstances(
+      data, seed)
+    val weightedModel = estimator.set(estimator.weightCol, "weight").fit(weightedData)
+    val overSampledModel = estimator.set(estimator.weightCol, "").fit(overSampledData)
+    modelEquals(weightedModel, overSampledModel)
+  }
+
+  /**
+   * Helper function for testing sample weights. Tests that injecting a large number of outliers
+   * with very small sample weights does not affect fitting. The predictor should learn the true
+   * model despite the outliers.
+   */
+  def testOutliersWithSmallWeights[M <: Model[M], E <: Estimator[M]](
+      data: Dataset[LabeledPoint],
+      estimator: E with HasWeightCol,
+      numClasses: Int,
+      modelEquals: (M, M) => Unit): Unit = {
+    import data.sqlContext.implicits._
+    val outlierDS = data.withColumn("weight", lit(1.0)).as[Instance].flatMap {
+      case Instance(l, w, f) =>
+        val outlierLabel = if (numClasses == 0) -l else numClasses - l - 1
+        List.fill(3)(Instance(outlierLabel, 0.0001, f)) ++ List(Instance(l, w, f))
+    }
+    val trueModel = estimator.set(estimator.weightCol, "").fit(data)
+    val outlierModel = estimator.set(estimator.weightCol, "weight").fit(outlierDS)
+    modelEquals(trueModel, outlierModel)
+  }
+
+  /**
+   * Helper function for testing sample weights. Tests that giving constant weights to each data
+   * point yields the same model, regardless of the magnitude of the weight.
+   */
+  def testArbitrarilyScaledWeights[M <: Model[M], E <: Estimator[M]](
+      data: Dataset[LabeledPoint],
+      estimator: E with HasWeightCol,
+      modelEquals: (M, M) => Unit): Unit = {
+    estimator.set(estimator.weightCol, "weight")
+    val models = Seq(0.001, 1.0, 1000.0).map { w =>
+      val df = data.withColumn("weight", lit(w))
+      estimator.fit(df)
+    }
+    models.sliding(2).foreach { case Seq(m1, m2) => modelEquals(m1, m2)}
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org