You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2018/03/20 18:14:38 UTC

spark git commit: [SPARK-21898][ML] Feature parity for KolmogorovSmirnovTest in MLlib

Repository: spark
Updated Branches:
  refs/heads/master 5e7bc2ace -> 7f5e8aa26


[SPARK-21898][ML] Feature parity for KolmogorovSmirnovTest in MLlib

## What changes were proposed in this pull request?

Feature parity for KolmogorovSmirnovTest in MLlib.
Implement `DataFrame` interface for `KolmogorovSmirnovTest` in `mllib.stat`.

## How was this patch tested?

Test suite added.

Author: WeichenXu <we...@databricks.com>
Author: jkbradley <jo...@gmail.com>

Closes #19108 from WeichenXu123/ml-ks-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f5e8aa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f5e8aa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f5e8aa2

Branch: refs/heads/master
Commit: 7f5e8aa2606b0ee0297ceb6f4603bd368e3b0291
Parents: 5e7bc2a
Author: WeichenXu <we...@databricks.com>
Authored: Tue Mar 20 11:14:34 2018 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Tue Mar 20 11:14:34 2018 -0700

----------------------------------------------------------------------
 .../spark/ml/stat/KolmogorovSmirnovTest.scala   | 113 +++++++++++++++
 .../ml/stat/JavaKolmogorovSmirnovTestSuite.java |  84 +++++++++++
 .../ml/stat/KolmogorovSmirnovTestSuite.scala    | 140 +++++++++++++++++++
 3 files changed, 337 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f5e8aa2/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
new file mode 100644
index 0000000..8d80e77
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import scala.annotation.varargs
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.java.function.Function
+import org.apache.spark.ml.util.SchemaUtils
+import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.col
+
+/**
+ * :: Experimental ::
+ *
+ * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a
+ * continuous distribution. By comparing the largest difference between the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical distribution.
+ * For more information on KS Test:
+ * @see <a href="https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test">
+ * Kolmogorov-Smirnov test (Wikipedia)</a>
+ */
+@Experimental
+@Since("2.4.0")
+object KolmogorovSmirnovTest {
+
+  /** Used to construct output schema of test */
+  private case class KolmogorovSmirnovTestResult(
+      pValue: Double,
+      statistic: Double)
+
+  private def getSampleRDD(dataset: DataFrame, sampleCol: String): RDD[Double] = {
+    SchemaUtils.checkNumericType(dataset.schema, sampleCol)
+    import dataset.sparkSession.implicits._
+    dataset.select(col(sampleCol).cast("double")).as[Double].rdd
+  }
+
+  /**
+   * Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
+   * continuous distribution. By comparing the largest difference between the empirical cumulative
+   * distribution of the sample data and the theoretical distribution we can provide a test for the
+   * the null hypothesis that the sample data comes from that theoretical distribution.
+   *
+   * @param dataset a `DataFrame` containing the sample of data to test
+   * @param sampleCol Name of sample column in dataset, of any numerical type
+   * @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value
+   * @return DataFrame containing the test result for the input sampled data.
+   *         This DataFrame will contain a single Row with the following fields:
+   *          - `pValue: Double`
+   *          - `statistic: Double`
+   */
+  @Since("2.4.0")
+  def test(dataset: DataFrame, sampleCol: String, cdf: Double => Double): DataFrame = {
+    val spark = dataset.sparkSession
+
+    val rdd = getSampleRDD(dataset, sampleCol)
+    val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, cdf)
+    spark.createDataFrame(Seq(KolmogorovSmirnovTestResult(
+      testResult.pValue, testResult.statistic)))
+  }
+
+  /**
+   * Java-friendly version of `test(dataset: DataFrame, sampleCol: String, cdf: Double => Double)`
+   */
+  @Since("2.4.0")
+  def test(dataset: DataFrame, sampleCol: String,
+    cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = {
+    test(dataset, sampleCol, (x: Double) => cdf.call(x))
+  }
+
+  /**
+   * Convenience function to conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability
+   * distribution equality. Currently supports the normal distribution, taking as parameters
+   * the mean and standard deviation.
+   *
+   * @param dataset a `DataFrame` containing the sample of data to test
+   * @param sampleCol Name of sample column in dataset, of any numerical type
+   * @param distName a `String` name for a theoretical distribution, currently only support "norm".
+   * @param params `Double*` specifying the parameters to be used for the theoretical distribution
+   * @return DataFrame containing the test result for the input sampled data.
+   *         This DataFrame will contain a single Row with the following fields:
+   *          - `pValue: Double`
+   *          - `statistic: Double`
+   */
+  @Since("2.4.0")
+  @varargs
+  def test(dataset: DataFrame, sampleCol: String, distName: String, params: Double*): DataFrame = {
+    val spark = dataset.sparkSession
+
+    val rdd = getSampleRDD(dataset, sampleCol)
+    val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, distName, params: _*)
+    spark.createDataFrame(Seq(KolmogorovSmirnovTestResult(
+      testResult.pValue, testResult.statistic)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f5e8aa2/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
new file mode 100644
index 0000000..021272d
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.spark.ml.linalg.VectorUDT;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.Test;
+
+import org.apache.spark.SharedSparkSession;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+
+public class JavaKolmogorovSmirnovTestSuite extends SharedSparkSession {
+
+  private transient Dataset<Row> dataset;
+
+  @Override
+  public void setUp() throws IOException {
+    super.setUp();
+    List<java.lang.Double> points = Arrays.asList(0.1, 1.1, 10.1, -1.1);
+
+    dataset = spark.createDataset(points, Encoders.DOUBLE()).toDF("sample");
+  }
+
+  @Test
+  public void testKSTestCDF() {
+    // Create theoretical distributions
+    NormalDistribution stdNormalDist = new NormalDistribution(0, 1);
+
+    // set seeds
+    Long seed = 10L;
+    stdNormalDist.reseedRandomGenerator(seed);
+    Function<Double, Double> stdNormalCDF = (x) -> stdNormalDist.cumulativeProbability(x);
+
+    double pThreshold = 0.05;
+
+    // Comparing a standard normal sample to a standard normal distribution
+    Row results = KolmogorovSmirnovTest
+      .test(dataset, "sample", stdNormalCDF).head();
+    double pValue1 = results.getDouble(0);
+    // Cannot reject null hypothesis
+    assert(pValue1 > pThreshold);
+  }
+
+  @Test
+  public void testKSTestNamedDistribution() {
+    double pThreshold = 0.05;
+
+    // Comparing a standard normal sample to a standard normal distribution
+    Row results = KolmogorovSmirnovTest
+            .test(dataset, "sample", "norm", 0.0, 1.0).head();
+    double pValue1 = results.getDouble(0);
+    // Cannot reject null hypothesis
+    assert(pValue1 > pThreshold);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f5e8aa2/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala
new file mode 100644
index 0000000..1312de3
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, NormalDistribution,
+  RealDistribution, UniformRealDistribution}
+import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => Math3KSTest}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.Row
+
+class KolmogorovSmirnovTestSuite
+  extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+
+  import testImplicits._
+
+  def apacheCommonMath3EquivalenceTest(
+    sampleDist: RealDistribution,
+    theoreticalDist: RealDistribution,
+    theoreticalDistByName: (String, Array[Double]),
+    rejectNullHypothesis: Boolean): Unit = {
+
+    // set seeds
+    val seed = 10L
+    sampleDist.reseedRandomGenerator(seed)
+    if (theoreticalDist != null) {
+      theoreticalDist.reseedRandomGenerator(seed)
+    }
+
+    // Sample data from the distributions and parallelize it
+    val n = 100000
+    val sampledArray = sampleDist.sample(n)
+    val sampledDF = sc.parallelize(sampledArray, 10).toDF("sample")
+
+    // Use a apache math commons local KS test to verify calculations
+    val ksTest = new Math3KSTest()
+    val pThreshold = 0.05
+
+    // Comparing a standard normal sample to a standard normal distribution
+    val Row(pValue1: Double, statistic1: Double) =
+      if (theoreticalDist != null) {
+        val cdf = (x: Double) => theoreticalDist.cumulativeProbability(x)
+        KolmogorovSmirnovTest.test(sampledDF, "sample", cdf).head()
+      } else {
+        KolmogorovSmirnovTest.test(sampledDF, "sample",
+          theoreticalDistByName._1,
+          theoreticalDistByName._2: _*
+        ).head()
+      }
+    val theoreticalDistMath3 = if (theoreticalDist == null) {
+      assert(theoreticalDistByName._1 == "norm")
+      val params = theoreticalDistByName._2
+      new NormalDistribution(params(0), params(1))
+    } else {
+      theoreticalDist
+    }
+    val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(theoreticalDistMath3, sampledArray)
+    val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n)
+    // Verify vs apache math commons ks test
+    assert(statistic1 ~== referenceStat1 relTol 1e-4)
+    assert(pValue1 ~== referencePVal1 relTol 1e-4)
+
+    if (rejectNullHypothesis) {
+      assert(pValue1 < pThreshold)
+    } else {
+      assert(pValue1 > pThreshold)
+    }
+  }
+
+  test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") {
+    // Create theoretical distributions
+    val stdNormalDist = new NormalDistribution(0.0, 1.0)
+    val expDist = new ExponentialDistribution(0.6)
+    val uniformDist = new UniformRealDistribution(0.0, 1.0)
+    val expDist2 = new ExponentialDistribution(0.2)
+    val stdNormByName = Tuple2("norm", Array(0.0, 1.0))
+
+    apacheCommonMath3EquivalenceTest(stdNormalDist, null, stdNormByName, false)
+    apacheCommonMath3EquivalenceTest(expDist, null, stdNormByName, true)
+    apacheCommonMath3EquivalenceTest(uniformDist, null, stdNormByName, true)
+    apacheCommonMath3EquivalenceTest(expDist, expDist2, null, true)
+  }
+
+  test("1 sample Kolmogorov-Smirnov test: R implementation equivalence") {
+    /*
+      Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample
+      > sessionInfo()
+      R version 3.2.0 (2015-04-16)
+      Platform: x86_64-apple-darwin13.4.0 (64-bit)
+      > set.seed(20)
+      > v <- rnorm(20)
+      > v
+       [1]  1.16268529 -0.58592447  1.78546500 -1.33259371 -0.44656677  0.56960612
+       [7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222
+      [13] -0.62812676  1.32322085 -1.52135057 -0.43742787  0.97057758  0.02822264
+      [19] -0.08578219  0.38921440
+      > ks.test(v, pnorm, alternative = "two.sided")
+
+               One-sample Kolmogorov-Smirnov test
+
+      data:  v
+      D = 0.18874, p-value = 0.4223
+      alternative hypothesis: two-sided
+    */
+
+    val rKSStat = 0.18874
+    val rKSPVal = 0.4223
+    val rData = sc.parallelize(
+      Array(
+        1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
+        -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
+        -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
+        -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
+        0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
+      )
+    ).toDF("sample")
+    val Row(pValue: Double, statistic: Double) = KolmogorovSmirnovTest
+      .test(rData, "sample", "norm", 0, 1).head()
+    assert(statistic ~== rKSStat relTol 1e-4)
+    assert(pValue ~== rKSPVal relTol 1e-4)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org