You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2018/03/20 18:14:38 UTC
spark git commit: [SPARK-21898][ML] Feature parity for
KolmogorovSmirnovTest in MLlib
Repository: spark
Updated Branches:
refs/heads/master 5e7bc2ace -> 7f5e8aa26
[SPARK-21898][ML] Feature parity for KolmogorovSmirnovTest in MLlib
## What changes were proposed in this pull request?
Feature parity for KolmogorovSmirnovTest in MLlib.
Implement `DataFrame` interface for `KolmogorovSmirnovTest` in `mllib.stat`.
## How was this patch tested?
Test suite added.
Author: WeichenXu <we...@databricks.com>
Author: jkbradley <jo...@gmail.com>
Closes #19108 from WeichenXu123/ml-ks-test.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f5e8aa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f5e8aa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f5e8aa2
Branch: refs/heads/master
Commit: 7f5e8aa2606b0ee0297ceb6f4603bd368e3b0291
Parents: 5e7bc2a
Author: WeichenXu <we...@databricks.com>
Authored: Tue Mar 20 11:14:34 2018 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Tue Mar 20 11:14:34 2018 -0700
----------------------------------------------------------------------
.../spark/ml/stat/KolmogorovSmirnovTest.scala | 113 +++++++++++++++
.../ml/stat/JavaKolmogorovSmirnovTestSuite.java | 84 +++++++++++
.../ml/stat/KolmogorovSmirnovTestSuite.scala | 140 +++++++++++++++++++
3 files changed, 337 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7f5e8aa2/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
new file mode 100644
index 0000000..8d80e77
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import scala.annotation.varargs
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.java.function.Function
+import org.apache.spark.ml.util.SchemaUtils
+import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.col
+
+/**
+ * :: Experimental ::
+ *
+ * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a
+ * continuous distribution. By comparing the largest difference between the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical distribution.
+ * For more information on KS Test:
+ * @see <a href="https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test">
+ * Kolmogorov-Smirnov test (Wikipedia)</a>
+ */
+@Experimental
+@Since("2.4.0")
+object KolmogorovSmirnovTest {
+
+ /** Used to construct output schema of test */
+ private case class KolmogorovSmirnovTestResult(
+ pValue: Double,
+ statistic: Double)
+
+ private def getSampleRDD(dataset: DataFrame, sampleCol: String): RDD[Double] = {
+ SchemaUtils.checkNumericType(dataset.schema, sampleCol)
+ import dataset.sparkSession.implicits._
+ dataset.select(col(sampleCol).cast("double")).as[Double].rdd
+ }
+
+ /**
+ * Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
+ * continuous distribution. By comparing the largest difference between the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical distribution.
+ *
+ * @param dataset a `DataFrame` containing the sample of data to test
+ * @param sampleCol Name of sample column in dataset, of any numerical type
+ * @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value
+ * @return DataFrame containing the test result for the input sampled data.
+ * This DataFrame will contain a single Row with the following fields:
+ * - `pValue: Double`
+ * - `statistic: Double`
+ */
+ @Since("2.4.0")
+ def test(dataset: DataFrame, sampleCol: String, cdf: Double => Double): DataFrame = {
+ val spark = dataset.sparkSession
+
+ val rdd = getSampleRDD(dataset, sampleCol)
+ val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, cdf)
+ spark.createDataFrame(Seq(KolmogorovSmirnovTestResult(
+ testResult.pValue, testResult.statistic)))
+ }
+
+ /**
+ * Java-friendly version of `test(dataset: DataFrame, sampleCol: String, cdf: Double => Double)`
+ */
+ @Since("2.4.0")
+ def test(dataset: DataFrame, sampleCol: String,
+ cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = {
+ test(dataset, sampleCol, (x: Double) => cdf.call(x))
+ }
+
+ /**
+ * Convenience function to conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability
+ * distribution equality. Currently supports the normal distribution, taking as parameters
+ * the mean and standard deviation.
+ *
+ * @param dataset a `DataFrame` containing the sample of data to test
+ * @param sampleCol Name of sample column in dataset, of any numerical type
+ * @param distName a `String` name for a theoretical distribution, currently only support "norm".
+ * @param params `Double*` specifying the parameters to be used for the theoretical distribution
+ * @return DataFrame containing the test result for the input sampled data.
+ * This DataFrame will contain a single Row with the following fields:
+ * - `pValue: Double`
+ * - `statistic: Double`
+ */
+ @Since("2.4.0")
+ @varargs
+ def test(dataset: DataFrame, sampleCol: String, distName: String, params: Double*): DataFrame = {
+ val spark = dataset.sparkSession
+
+ val rdd = getSampleRDD(dataset, sampleCol)
+ val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, distName, params: _*)
+ spark.createDataFrame(Seq(KolmogorovSmirnovTestResult(
+ testResult.pValue, testResult.statistic)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7f5e8aa2/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
new file mode 100644
index 0000000..021272d
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.spark.ml.linalg.VectorUDT;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.Test;
+
+import org.apache.spark.SharedSparkSession;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+
+public class JavaKolmogorovSmirnovTestSuite extends SharedSparkSession {
+
+ private transient Dataset<Row> dataset;
+
+ @Override
+ public void setUp() throws IOException {
+ super.setUp();
+ List<java.lang.Double> points = Arrays.asList(0.1, 1.1, 10.1, -1.1);
+
+ dataset = spark.createDataset(points, Encoders.DOUBLE()).toDF("sample");
+ }
+
+ @Test
+ public void testKSTestCDF() {
+ // Create theoretical distributions
+ NormalDistribution stdNormalDist = new NormalDistribution(0, 1);
+
+ // set seeds
+ Long seed = 10L;
+ stdNormalDist.reseedRandomGenerator(seed);
+ Function<Double, Double> stdNormalCDF = (x) -> stdNormalDist.cumulativeProbability(x);
+
+ double pThreshold = 0.05;
+
+ // Comparing a standard normal sample to a standard normal distribution
+ Row results = KolmogorovSmirnovTest
+ .test(dataset, "sample", stdNormalCDF).head();
+ double pValue1 = results.getDouble(0);
+ // Cannot reject null hypothesis
+ assert(pValue1 > pThreshold);
+ }
+
+ @Test
+ public void testKSTestNamedDistribution() {
+ double pThreshold = 0.05;
+
+ // Comparing a standard normal sample to a standard normal distribution
+ Row results = KolmogorovSmirnovTest
+ .test(dataset, "sample", "norm", 0.0, 1.0).head();
+ double pValue1 = results.getDouble(0);
+ // Cannot reject null hypothesis
+ assert(pValue1 > pThreshold);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7f5e8aa2/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala
new file mode 100644
index 0000000..1312de3
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, NormalDistribution,
+ RealDistribution, UniformRealDistribution}
+import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => Math3KSTest}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.Row
+
+class KolmogorovSmirnovTestSuite
+ extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+
+ import testImplicits._
+
+ def apacheCommonMath3EquivalenceTest(
+ sampleDist: RealDistribution,
+ theoreticalDist: RealDistribution,
+ theoreticalDistByName: (String, Array[Double]),
+ rejectNullHypothesis: Boolean): Unit = {
+
+ // set seeds
+ val seed = 10L
+ sampleDist.reseedRandomGenerator(seed)
+ if (theoreticalDist != null) {
+ theoreticalDist.reseedRandomGenerator(seed)
+ }
+
+ // Sample data from the distributions and parallelize it
+ val n = 100000
+ val sampledArray = sampleDist.sample(n)
+ val sampledDF = sc.parallelize(sampledArray, 10).toDF("sample")
+
+ // Use a apache math commons local KS test to verify calculations
+ val ksTest = new Math3KSTest()
+ val pThreshold = 0.05
+
+ // Comparing a standard normal sample to a standard normal distribution
+ val Row(pValue1: Double, statistic1: Double) =
+ if (theoreticalDist != null) {
+ val cdf = (x: Double) => theoreticalDist.cumulativeProbability(x)
+ KolmogorovSmirnovTest.test(sampledDF, "sample", cdf).head()
+ } else {
+ KolmogorovSmirnovTest.test(sampledDF, "sample",
+ theoreticalDistByName._1,
+ theoreticalDistByName._2: _*
+ ).head()
+ }
+ val theoreticalDistMath3 = if (theoreticalDist == null) {
+ assert(theoreticalDistByName._1 == "norm")
+ val params = theoreticalDistByName._2
+ new NormalDistribution(params(0), params(1))
+ } else {
+ theoreticalDist
+ }
+ val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(theoreticalDistMath3, sampledArray)
+ val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n)
+ // Verify vs apache math commons ks test
+ assert(statistic1 ~== referenceStat1 relTol 1e-4)
+ assert(pValue1 ~== referencePVal1 relTol 1e-4)
+
+ if (rejectNullHypothesis) {
+ assert(pValue1 < pThreshold)
+ } else {
+ assert(pValue1 > pThreshold)
+ }
+ }
+
+ test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") {
+ // Create theoretical distributions
+ val stdNormalDist = new NormalDistribution(0.0, 1.0)
+ val expDist = new ExponentialDistribution(0.6)
+ val uniformDist = new UniformRealDistribution(0.0, 1.0)
+ val expDist2 = new ExponentialDistribution(0.2)
+ val stdNormByName = Tuple2("norm", Array(0.0, 1.0))
+
+ apacheCommonMath3EquivalenceTest(stdNormalDist, null, stdNormByName, false)
+ apacheCommonMath3EquivalenceTest(expDist, null, stdNormByName, true)
+ apacheCommonMath3EquivalenceTest(uniformDist, null, stdNormByName, true)
+ apacheCommonMath3EquivalenceTest(expDist, expDist2, null, true)
+ }
+
+ test("1 sample Kolmogorov-Smirnov test: R implementation equivalence") {
+ /*
+ Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample
+ > sessionInfo()
+ R version 3.2.0 (2015-04-16)
+ Platform: x86_64-apple-darwin13.4.0 (64-bit)
+ > set.seed(20)
+ > v <- rnorm(20)
+ > v
+ [1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612
+ [7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222
+ [13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264
+ [19] -0.08578219 0.38921440
+ > ks.test(v, pnorm, alternative = "two.sided")
+
+ One-sample Kolmogorov-Smirnov test
+
+ data: v
+ D = 0.18874, p-value = 0.4223
+ alternative hypothesis: two-sided
+ */
+
+ val rKSStat = 0.18874
+ val rKSPVal = 0.4223
+ val rData = sc.parallelize(
+ Array(
+ 1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
+ -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
+ -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
+ -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
+ 0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
+ )
+ ).toDF("sample")
+ val Row(pValue: Double, statistic: Double) = KolmogorovSmirnovTest
+ .test(rData, "sample", "norm", 0, 1).head()
+ assert(statistic ~== rKSStat relTol 1e-4)
+ assert(pValue ~== rKSPVal relTol 1e-4)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org