You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/02/09 11:12:17 UTC

spark git commit: SPARK-2149. [MLLIB] Univariate kernel density estimation

Repository: spark
Updated Branches:
  refs/heads/master 4dfe180fc -> 0793ee1b4


SPARK-2149. [MLLIB] Univariate kernel density estimation

Author: Sandy Ryza <sa...@cloudera.com>

Closes #1093 from sryza/sandy-spark-2149 and squashes the following commits:

5f06b33 [Sandy Ryza] More review comments
0f73060 [Sandy Ryza] Respond to Sean's review comments
0dfa005 [Sandy Ryza] SPARK-2149. Univariate kernel density estimation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0793ee1b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0793ee1b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0793ee1b

Branch: refs/heads/master
Commit: 0793ee1b4dea1f4b0df749e8ad7c1ab70b512faf
Parents: 4dfe180
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Mon Feb 9 10:12:12 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Feb 9 10:12:12 2015 +0000

----------------------------------------------------------------------
 .../apache/spark/mllib/stat/KernelDensity.scala | 71 ++++++++++++++++++++
 .../apache/spark/mllib/stat/Statistics.scala    | 14 ++++
 .../spark/mllib/stat/KernelDensitySuite.scala   | 47 +++++++++++++
 3 files changed, 132 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0793ee1b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala
new file mode 100644
index 0000000..0deef11
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.rdd.RDD
+
+private[stat] object KernelDensity {
+  /**
+   * Given a set of samples from a distribution, estimates its density at the set of given points.
+   * Uses a Gaussian kernel with the given standard deviation.
+   */
+  def estimate(samples: RDD[Double], standardDeviation: Double,
+      evaluationPoints: Array[Double]): Array[Double] = {
+    if (standardDeviation <= 0.0) {
+      throw new IllegalArgumentException("Standard deviation must be positive")
+    }
+
+    // This gets used in each Gaussian PDF computation, so compute it up front
+    val logStandardDeviationPlusHalfLog2Pi =
+      Math.log(standardDeviation) + 0.5 * Math.log(2 * Math.PI)
+
+    val (points, count) = samples.aggregate((new Array[Double](evaluationPoints.length), 0))(
+      (x, y) => {
+        var i = 0
+        while (i < evaluationPoints.length) {
+          x._1(i) += normPdf(y, standardDeviation, logStandardDeviationPlusHalfLog2Pi,
+            evaluationPoints(i))
+          i += 1
+        }
+        (x._1, i)
+      },
+      (x, y) => {
+        var i = 0
+        while (i < evaluationPoints.length) {
+          x._1(i) += y._1(i)
+          i += 1
+        }
+        (x._1, x._2 + y._2)
+      })
+
+    var i = 0
+    while (i < points.length) {
+      points(i) /= count
+      i += 1
+    }
+    points
+  }
+
+  private def normPdf(mean: Double, standardDeviation: Double,
+      logStandardDeviationPlusHalfLog2Pi: Double, x: Double): Double = {
+    val x0 = x - mean
+    val x1 = x0 / standardDeviation
+    val logDensity = -0.5 * x1 * x1 - logStandardDeviationPlusHalfLog2Pi
+    Math.exp(logDensity)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0793ee1b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
index b3fad0c..3256162 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
@@ -149,4 +149,18 @@ object Statistics {
   def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = {
     ChiSqTest.chiSquaredFeatures(data)
   }
+
+  /**
+   * Given an empirical distribution defined by the input RDD of samples, estimate its density at
+   * each of the given evaluation points using a Gaussian kernel.
+   *
+   * @param samples The samples RDD used to define the empirical distribution.
+   * @param standardDeviation The standard deviation of the kernel Gaussians.
+   * @param evaluationPoints The points at which to estimate densities.
+   * @return An array the same size as evaluationPoints with the density at each point.
+   */
+  def kernelDensity(samples: RDD[Double], standardDeviation: Double,
+      evaluationPoints: Iterable[Double]): Array[Double] = {
+    KernelDensity.estimate(samples, standardDeviation, evaluationPoints.toArray)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0793ee1b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
new file mode 100644
index 0000000..f6a1e19
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.scalatest.FunSuite
+
+import org.apache.commons.math3.distribution.NormalDistribution
+
+import org.apache.spark.mllib.util.LocalClusterSparkContext
+
+class KernelDensitySuite extends FunSuite with LocalClusterSparkContext {
+  test("kernel density single sample") {
+    val rdd = sc.parallelize(Array(5.0))
+    val evaluationPoints = Array(5.0, 6.0)
+    val densities = KernelDensity.estimate(rdd, 3.0, evaluationPoints)
+    val normal = new NormalDistribution(5.0, 3.0)
+    val acceptableErr = 1e-6
+    assert(densities(0) - normal.density(5.0) < acceptableErr)
+    assert(densities(0) - normal.density(6.0) < acceptableErr)
+  }
+
+  test("kernel density multiple samples") {
+    val rdd = sc.parallelize(Array(5.0, 10.0))
+    val evaluationPoints = Array(5.0, 6.0)
+    val densities = KernelDensity.estimate(rdd, 3.0, evaluationPoints)
+    val normal1 = new NormalDistribution(5.0, 3.0)
+    val normal2 = new NormalDistribution(10.0, 3.0)
+    val acceptableErr = 1e-6
+    assert(densities(0) - (normal1.density(5.0) + normal2.density(5.0)) / 2 < acceptableErr)
+    assert(densities(0) - (normal1.density(6.0) + normal2.density(6.0)) / 2 < acceptableErr)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org