You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/29 18:15:41 UTC

spark git commit: [SPARK-14154][MLLIB] Simplify the implementation for Kolmogorov–Smirnov test

Repository: spark
Updated Branches:
  refs/heads/master a632bb56f -> d2a819a63


[SPARK-14154][MLLIB] Simplify the implementation for Kolmogorov–Smirnov test

## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-14154

I just read the code for KolmogorovSmirnovTest and find it could be much simplified following the original definition.

Send a PR for discussion

## How was this patch tested?
unit test

Author: Yuhao Yang <hh...@gmail.com>

Closes #11954 from hhbyyh/ksoptimize.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2a819a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2a819a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2a819a6

Branch: refs/heads/master
Commit: d2a819a6363190b946986ebf6f8001d520098c3b
Parents: a632bb5
Author: Yuhao Yang <hh...@gmail.com>
Authored: Tue Mar 29 09:16:50 2016 -0700
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Mar 29 09:16:50 2016 -0700

----------------------------------------------------------------------
 .../mllib/stat/test/KolmogorovSmirnovTest.scala | 77 +-------------------
 1 file changed, 4 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2a819a6/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
index baf9e5e..0ec8975 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
@@ -64,11 +64,10 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
    */
   def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = {
     val n = data.count().toDouble
-    val localData = data.sortBy(x => x).mapPartitions { part =>
-      val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
-      searchOneSampleCandidates(partDiffs) // candidates: local extrema
-    }.collect()
-    val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme
+    val ksStat = data.sortBy(x => x).zipWithIndex().map { case (v, i) =>
+      val f = cdf(v)
+      math.max(f - i / n, (i + 1) / n - f)
+    }.max()
     evalOneSampleP(ksStat, n.toLong)
   }
 
@@ -85,74 +84,6 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
   }
 
   /**
-   * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a
-   * partition
-   * @param partData `Iterator[Double]` 1 partition of a sorted RDD
-   * @param n `Double` the total size of the RDD
-   * @param cdf `Double => Double` a function the calculates the theoretical CDF of a value
-   * @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema
-   *        in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF,
-   *        the second element corresponds to empirical CDF - CDF.  We can then search the resulting
-   *        iterator for the minimum of the first and the maximum of the second element, and provide
-   *        this as a partition's candidate extrema
-   */
-  private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double)
-    : Iterator[(Double, Double)] = {
-    // zip data with index (within that partition)
-    // calculate local (unadjusted) empirical CDF and subtract CDF
-    partData.zipWithIndex.map { case (v, ix) =>
-      // dp and dl are later adjusted by constant, when global info is available
-      val dp = (ix + 1) / n
-      val dl = ix / n
-      val cdfVal = cdf(v)
-      (dl - cdfVal, dp - cdfVal)
-    }
-  }
-
-  /**
-   * Search the unadjusted differences in a partition and return the
-   * two extrema (furthest below and furthest above CDF), along with a count of elements in that
-   * partition
-   * @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF
-   *                 and CDFin a partition, which come as a tuple of
-   *                 (empirical CDF - 1/N - CDF, empirical CDF - CDF)
-   * @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements
-   */
-  private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)])
-    : Iterator[(Double, Double, Double)] = {
-    val initAcc = (Double.MaxValue, Double.MinValue, 0.0)
-    val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) =>
-      (math.min(pMin, dl), math.max(pMax, dp), pCt + 1)
-    }
-    val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults)
-    results.iterator
-  }
-
-  /**
-   * Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after
-   * adjusting local extrema estimates from individual partitions with the amount of elements in
-   * preceding partitions
-   * @param localData `Array[(Double, Double, Double)]` A local array containing the collected
-   *                 results of `searchOneSampleCandidates` across all partitions
-   * @param n `Double`The size of the RDD
-   * @return The one-sample Kolmogorov Smirnov Statistic
-   */
-  private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
-    : Double = {
-    val initAcc = (Double.MinValue, 0.0)
-    // adjust differences based on the number of elements preceding it, which should provide
-    // the correct distance between empirical CDF and CDF
-    val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
-      val adjConst = prevCt / n
-      val dist1 = math.abs(minCand + adjConst)
-      val dist2 = math.abs(maxCand + adjConst)
-      val maxVal = Array(prevMax, dist1, dist2).max
-      (maxVal, prevCt + ct)
-    }
-    results._1
-  }
-
-  /**
    * A convenience function that allows running the KS test for 1 set of sample data against
    * a named distribution
    * @param data the sample data that we wish to evaluate


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org