You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/07/11 05:55:54 UTC

spark git commit: [SPARK-8598] [MLLIB] Implementation of 1-sample, two-sided, Kolmogorov Smirnov Test for RDDs

Repository: spark
Updated Branches:
  refs/heads/master 6e1c7e279 -> 9c5075775


[SPARK-8598] [MLLIB] Implementation of 1-sample, two-sided, Kolmogorov Smirnov Test for RDDs

This contribution is my original work and I license it to the project under it's open source license.

Author: jose.cambronero <jo...@cloudera.com>

Closes #6994 from josepablocam/master and squashes the following commits:

bbb30b1 [jose.cambronero] renamed KSTestResult to KolmogorovSmirnovTestResult, to stay consistent with method name
0d0c201 [jose.cambronero] kstTest -> kolmogorovSmirnovTest in statistics.md
1f56371 [jose.cambronero] changed ksTest in public API to kolmogorovSmirnovTest for clarity
a48ae7b [jose.cambronero] refactor code to account for serializable RealDistribution. Reuse testOneSample( _, cdf)
1bb44bd [jose.cambronero]  style and doc changes. Factored out ks test into 2 separate tests
2ec2aa6 [jose.cambronero] initialize to stdnormal when no params passed (and log). Change unit tests to approximate equivalence rather than strict
a4bc0c7 [jose.cambronero] changed ksTest(data, distName) to ksTest(data, distName, params*) after api discussions. Changed tests and docs accordingly
7e66f57 [jose.cambronero] copied implementation note to public api docs, and added @see for links to wiki info
e760ebd [jose.cambronero] line length changes to fit style check
3288e42 [jose.cambronero] addressed style changes, correctness change to simpler approach, and fixed edge case for foldLeft in searchOneSampleCandidates when a partition is empty
9026895 [jose.cambronero] addressed style changes, correctness change to simpler approach, and fixed edge case for foldLeft in searchOneSampleCandidates when a partition is empty
1226b30 [jose.cambronero] reindent multi-line lambdas, prior intepretation of style guide was wrong on my part
9c0f1af [jose.cambronero] additional style changes incorporated and added documentation to mllib statistics docs
3f81ad2 [jose.cambronero] renamed ks1 sample test for clarity
992293b [jose.cambronero] Style changes as per comments and added implementation note explaining the distributed approach.
6a4784f [jose.cambronero] specified what distributions are available for the convenience method ksTest(data, name) (solely standard normal)
4b8ba61 [jose.cambronero] fixed off by 1/N in cases when post-constant adjustment ecdf is above cdf, but prior to adj it was below
0b5e8ec [jose.cambronero] changed KS one sample test to perform just 1 distributed pass (in addition to the sorting pass), operates on each partition separately. Implementation of Sandy Ryza's algorithm
16b5c4c [jose.cambronero] renamed dat to data and eliminated recalc of RDD size by sharing as argument between empirical and evalOneSampleP
c18dc66 [jose.cambronero] removed ksTestOpt from API and changed comments in HypothesisTestSuite accordingly
f6951b6 [jose.cambronero] changed style and some comments based on feedback from pull request
b9cff3a [jose.cambronero] made small changes to pass style check
ce8e9a1 [jose.cambronero] added kstest testing in HypothesisTestSuite
4da189b [jose.cambronero] added user facing ks test functions
c659ea1 [jose.cambronero] created KS test class
13dfe4d [jose.cambronero] created test result class for ks test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c507577
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c507577
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c507577

Branch: refs/heads/master
Commit: 9c5075775d741eacbeeb2df77ea30611356b6e1a
Parents: 6e1c7e2
Author: jose.cambronero <jo...@cloudera.com>
Authored: Fri Jul 10 20:55:45 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Fri Jul 10 20:55:45 2015 -0700

----------------------------------------------------------------------
 docs/mllib-statistics.md                        |  37 +++-
 .../apache/spark/mllib/stat/Statistics.scala    |  40 +++-
 .../mllib/stat/test/KolmogorovSmirnovTest.scala | 194 +++++++++++++++++++
 .../spark/mllib/stat/test/TestResult.scala      |  17 ++
 .../spark/mllib/stat/HypothesisTestSuite.scala  | 101 ++++++++++
 5 files changed, 387 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c507577/docs/mllib-statistics.md
----------------------------------------------------------------------
diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index 887eae7..de5d648 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -283,7 +283,7 @@ approxSample = data.sampleByKey(False, fractions);
 
 Hypothesis testing is a powerful tool in statistics to determine whether a result is statistically 
 significant, whether this result occurred by chance or not. MLlib currently supports Pearson's 
-chi-squared ( $\chi^2$) tests for goodness of fit and independence. The input data types determine 
+chi-squared ( $\chi^2$) tests for goodness of fit and independence. The input data types determine
 whether the goodness of fit or the independence test is conducted. The goodness of fit test requires 
 an input type of `Vector`, whereas the independence test requires a `Matrix` as input.
 
@@ -422,6 +422,41 @@ for i, result in enumerate(featureTestResults):
 
 </div>
 
+Additionally, MLlib provides a 1-sample, 2-sided implementation of the Kolmogorov-Smirnov (KS) test
+for equality of probability distributions. By providing the name of a theoretical distribution
+(currently solely supported for the normal distribution) and its parameters, or a function to 
+calculate the cumulative distribution according to a given theoretical distribution, the user can
+test the null hypothesis that their sample is drawn from that distribution. In the case that the
+user tests against the normal distribution (`distName="norm"`), but does not provide distribution
+parameters, the test initializes to the standard normal distribution and logs an appropriate 
+message.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+[`Statistics`](api/scala/index.html#org.apache.spark.mllib.stat.Statistics$) provides methods to
+run a 1-sample, 2-sided Kolmogorov-Smirnov test. The following example demonstrates how to run
+and interpret the hypothesis tests.
+
+{% highlight scala %}
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.stat.Statistics._
+
+val data: RDD[Double] = ... // an RDD of sample data
+
+// run a KS test for the sample versus a standard normal distribution
+val testResult = Statistics.kolmogorovSmirnovTest(data, "norm", 0, 1)
+println(testResult) // summary of the test including the p-value, test statistic,
+                      // and null hypothesis
+                      // if our p-value indicates significance, we can reject the null hypothesis
+
+// perform a KS test using a cumulative distribution function of our making
+val myCDF: Double => Double = ...
+val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
+{% endhighlight %}
+</div>
+</div>
+
+
 ## Random data generation
 
 Random data generation is useful for randomized algorithms, prototyping, and performance testing.

http://git-wip-us.apache.org/repos/asf/spark/blob/9c507577/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
index 900007e..9033202 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
@@ -17,13 +17,16 @@
 
 package org.apache.spark.mllib.stat
 
+import scala.annotation.varargs
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.linalg.distributed.RowMatrix
 import org.apache.spark.mllib.linalg.{Matrix, Vector}
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.stat.correlation.Correlations
-import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSqTestResult}
+import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSqTestResult, KolmogorovSmirnovTest,
+  KolmogorovSmirnovTestResult}
 import org.apache.spark.rdd.RDD
 
 /**
@@ -158,4 +161,39 @@ object Statistics {
   def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = {
     ChiSqTest.chiSquaredFeatures(data)
   }
+
+  /**
+   * Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
+   * continuous distribution. By comparing the largest difference between the empirical cumulative
+   * distribution of the sample data and the theoretical distribution we can provide a test for the
+   * the null hypothesis that the sample data comes from that theoretical distribution.
+   * For more information on KS Test:
+   * @see [[https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test]]
+   *
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value
+   * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test
+   *        statistic, p-value, and null hypothesis.
+   */
+  def kolmogorovSmirnovTest(data: RDD[Double], cdf: Double => Double)
+    : KolmogorovSmirnovTestResult = {
+    KolmogorovSmirnovTest.testOneSample(data, cdf)
+  }
+
+  /**
+   * Convenience function to conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability
+   * distribution equality. Currently supports the normal distribution, taking as parameters
+   * the mean and standard deviation.
+   * (distName = "norm")
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param distName a `String` name for a theoretical distribution
+   * @param params `Double*` specifying the parameters to be used for the theoretical distribution
+   * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test
+   *        statistic, p-value, and null hypothesis.
+   */
+  @varargs
+  def kolmogorovSmirnovTest(data: RDD[Double], distName: String, params: Double*)
+    : KolmogorovSmirnovTestResult = {
+    KolmogorovSmirnovTest.testOneSample(data, distName, params: _*)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c507577/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
new file mode 100644
index 0000000..d89b005
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a
+ * continuous distribution. By comparing the largest difference between the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical distribution.
+ * For more information on KS Test:
+ * @see [[https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test]]
+ *
+ * Implementation note: We seek to implement the KS test with a minimal number of distributed
+ * passes. We sort the RDD, and then perform the following operations on a per-partition basis:
+ * calculate an empirical cumulative distribution value for each observation, and a theoretical
+ * cumulative distribution value. We know the latter to be correct, while the former will be off by
+ * a constant (how large the constant is depends on how many values precede it in other partitions).
+ * However, given that this constant simply shifts the empirical CDF upwards, but doesn't
+ * change its shape, and furthermore, that constant is the same within a given partition, we can
+ * pick 2 values in each partition that can potentially resolve to the largest global distance.
+ * Namely, we pick the minimum distance and the maximum distance. Additionally, we keep track of how
+ * many elements are in each partition. Once these three values have been returned for every
+ * partition, we can collect and operate locally. Locally, we can now adjust each distance by the
+ * appropriate constant (the cumulative sum of number of elements in the prior partitions divided by
+ * thedata set size). Finally, we take the maximum absolute value, and this is the statistic.
+ */
+private[stat] object KolmogorovSmirnovTest extends Logging {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+    type NullHypothesis = Value
+    val OneSampleTwoSided = Value("Sample follows theoretical distribution")
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double => Double` function to calculate the theoretical CDF
+   * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] summarizing the test
+   *        results (p-value, statistic, and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = {
+    val n = data.count().toDouble
+    val localData = data.sortBy(x => x).mapPartitions { part =>
+      val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
+      searchOneSampleCandidates(partDiffs) // candidates: local extrema
+    }.collect()
+    val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme
+    evalOneSampleP(ksStat, n.toLong)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param distObj `RealDistribution` a theoretical distribution
+   * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] summarizing the test
+   *        results (p-value, statistic, and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], distObj: RealDistribution): KolmogorovSmirnovTestResult = {
+    val cdf = (x: Double) => distObj.cumulativeProbability(x)
+    testOneSample(data, cdf)
+  }
+
+  /**
+   * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a
+   * partition
+   * @param partData `Iterator[Double]` 1 partition of a sorted RDD
+   * @param n `Double` the total size of the RDD
+   * @param cdf `Double => Double` a function the calculates the theoretical CDF of a value
+   * @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema
+   *        in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF,
+   *        the second element corresponds to empirical CDF - CDF.  We can then search the resulting
+   *        iterator for the minimum of the first and the maximum of the second element, and provide
+   *        this as a partition's candidate extrema
+   */
+  private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double)
+    : Iterator[(Double, Double)] = {
+    // zip data with index (within that partition)
+    // calculate local (unadjusted) empirical CDF and subtract CDF
+    partData.zipWithIndex.map { case (v, ix) =>
+      // dp and dl are later adjusted by constant, when global info is available
+      val dp = (ix + 1) / n
+      val dl = ix / n
+      val cdfVal = cdf(v)
+      (dl - cdfVal, dp - cdfVal)
+    }
+  }
+
+  /**
+   * Search the unadjusted differences in a partition and return the
+   * two extrema (furthest below and furthest above CDF), along with a count of elements in that
+   * partition
+   * @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF
+   *                 and CDFin a partition, which come as a tuple of
+   *                 (empirical CDF - 1/N - CDF, empirical CDF - CDF)
+   * @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements
+   */
+  private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)])
+    : Iterator[(Double, Double, Double)] = {
+    val initAcc = (Double.MaxValue, Double.MinValue, 0.0)
+    val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) =>
+      (math.min(pMin, dl), math.max(pMax, dp), pCt + 1)
+    }
+    val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults)
+    results.iterator
+  }
+
+  /**
+   * Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after
+   * adjusting local extrema estimates from individual partitions with the amount of elements in
+   * preceding partitions
+   * @param localData `Array[(Double, Double, Double)]` A local array containing the collected
+   *                 results of `searchOneSampleCandidates` across all partitions
+   * @param n `Double`The size of the RDD
+   * @return The one-sample Kolmogorov Smirnov Statistic
+   */
+  private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
+    : Double = {
+    val initAcc = (Double.MinValue, 0.0)
+    // adjust differences based on the number of elements preceding it, which should provide
+    // the correct distance between empirical CDF and CDF
+    val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
+      val adjConst = prevCt / n
+      val dist1 = math.abs(minCand + adjConst)
+      val dist2 = math.abs(maxCand + adjConst)
+      val maxVal = Array(prevMax, dist1, dist2).max
+      (maxVal, prevCt + ct)
+    }
+    results._1
+  }
+
+  /**
+   * A convenience function that allows running the KS test for 1 set of sample data against
+   * a named distribution
+   * @param data the sample data that we wish to evaluate
+   * @param distName the name of the theoretical distribution
+   * @param params Variable length parameter for distribution's parameters
+   * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] summarizing the
+   *        test results (p-value, statistic, and null hypothesis)
+   */
+  @varargs
+  def testOneSample(data: RDD[Double], distName: String, params: Double*)
+    : KolmogorovSmirnovTestResult = {
+    val distObj =
+      distName match {
+        case "norm" => {
+          if (params.nonEmpty) {
+            // parameters are passed, then can only be 2
+            require(params.length == 2, "Normal distribution requires mean and standard " +
+              "deviation as parameters")
+            new NormalDistribution(params(0), params(1))
+          } else {
+            // if no parameters passed in initializes to standard normal
+            logInfo("No parameters specified for normal distribution," +
+              "initialized to standard normal (i.e. N(0, 1))")
+            new NormalDistribution(0, 1)
+          }
+        }
+        case  _ => throw new UnsupportedOperationException(s"$distName not yet supported through" +
+          s" convenience method. Current options are:['norm'].")
+      }
+
+    testOneSample(data, distObj)
+  }
+
+  private def evalOneSampleP(ksStat: Double, n: Long): KolmogorovSmirnovTestResult = {
+    val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
+    new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9c507577/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala
index 4784f9e..f44be13 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala
@@ -90,3 +90,20 @@ class ChiSqTestResult private[stat] (override val pValue: Double,
       super.toString
   }
 }
+
+/**
+ * :: Experimental ::
+ * Object containing the test results for the Kolmogorov-Smirnov test.
+ */
+@Experimental
+class KolmogorovSmirnovTestResult private[stat] (
+    override val pValue: Double,
+    override val statistic: Double,
+    override val nullHypothesis: String) extends TestResult[Int] {
+
+  override val degreesOfFreedom = 0
+
+  override def toString: String = {
+    "Kolmogorov-Smirnov test summary:\n" + super.toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9c507577/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala
index b084a5f..142b90e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala
@@ -19,6 +19,10 @@ package org.apache.spark.mllib.stat
 
 import java.util.Random
 
+import org.apache.commons.math3.distribution.{ExponentialDistribution,
+  NormalDistribution, UniformRealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
 import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors}
 import org.apache.spark.mllib.regression.LabeledPoint
@@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
       Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
     }
   }
+
+  test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") {
+    // Create theoretical distributions
+    val stdNormalDist = new NormalDistribution(0, 1)
+    val expDist = new ExponentialDistribution(0.6)
+    val unifDist = new UniformRealDistribution()
+
+    // set seeds
+    val seed = 10L
+    stdNormalDist.reseedRandomGenerator(seed)
+    expDist.reseedRandomGenerator(seed)
+    unifDist.reseedRandomGenerator(seed)
+
+    // Sample data from the distributions and parallelize it
+    val n = 100000
+    val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10)
+    val sampledExp = sc.parallelize(expDist.sample(n), 10)
+    val sampledUnif = sc.parallelize(unifDist.sample(n), 10)
+
+    // Use a apache math commons local KS test to verify calculations
+    val ksTest = new KolmogorovSmirnovTest()
+    val pThreshold = 0.05
+
+    // Comparing a standard normal sample to a standard normal distribution
+    val result1 = Statistics.kolmogorovSmirnovTest(sampledNorm, "norm", 0, 1)
+    val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledNorm.collect())
+    val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n)
+    // Verify vs apache math commons ks test
+    assert(result1.statistic ~== referenceStat1 relTol 1e-4)
+    assert(result1.pValue ~== referencePVal1 relTol 1e-4)
+    // Cannot reject null hypothesis
+    assert(result1.pValue > pThreshold)
+
+    // Comparing an exponential sample to a standard normal distribution
+    val result2 = Statistics.kolmogorovSmirnovTest(sampledExp, "norm", 0, 1)
+    val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledExp.collect())
+    val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n)
+    // verify vs apache math commons ks test
+    assert(result2.statistic ~== referenceStat2 relTol 1e-4)
+    assert(result2.pValue ~== referencePVal2 relTol 1e-4)
+    // reject null hypothesis
+    assert(result2.pValue < pThreshold)
+
+    // Testing the use of a user provided CDF function
+    // Distribution is not serializable, so will have to create in the lambda
+    val expCDF = (x: Double) => new ExponentialDistribution(0.2).cumulativeProbability(x)
+
+    // Comparing an exponential sample with mean X to an exponential distribution with mean Y
+    // Where X != Y
+    val result3 = Statistics.kolmogorovSmirnovTest(sampledExp, expCDF)
+    val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new ExponentialDistribution(0.2),
+      sampledExp.collect())
+    val referencePVal3 = 1 - ksTest.cdf(referenceStat3, sampledNorm.count().toInt)
+    // verify vs apache math commons ks test
+    assert(result3.statistic ~== referenceStat3 relTol 1e-4)
+    assert(result3.pValue ~== referencePVal3 relTol 1e-4)
+    // reject null hypothesis
+    assert(result3.pValue < pThreshold)
+  }
+
+  test("1 sample Kolmogorov-Smirnov test: R implementation equivalence") {
+    /*
+      Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample
+      > sessionInfo()
+      R version 3.2.0 (2015-04-16)
+      Platform: x86_64-apple-darwin13.4.0 (64-bit)
+      > set.seed(20)
+      > v <- rnorm(20)
+      > v
+       [1]  1.16268529 -0.58592447  1.78546500 -1.33259371 -0.44656677  0.56960612
+       [7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222
+      [13] -0.62812676  1.32322085 -1.52135057 -0.43742787  0.97057758  0.02822264
+      [19] -0.08578219  0.38921440
+      > ks.test(v, pnorm, alternative = "two.sided")
+
+               One-sample Kolmogorov-Smirnov test
+
+      data:  v
+      D = 0.18874, p-value = 0.4223
+      alternative hypothesis: two-sided
+    */
+
+    val rKSStat = 0.18874
+    val rKSPVal = 0.4223
+    val rData = sc.parallelize(
+      Array(
+        1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
+        -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
+        -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
+        -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
+        0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
+      )
+    )
+    val rCompResult = Statistics.kolmogorovSmirnovTest(rData, "norm", 0, 1)
+    assert(rCompResult.statistic ~== rKSStat relTol 1e-4)
+    assert(rCompResult.pValue ~== rKSPVal relTol 1e-4)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org