You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/07/14 23:44:52 UTC
spark git commit: [SPARK-4362] [MLLIB] Make prediction probability
available in NaiveBayesModel
Repository: spark
Updated Branches:
refs/heads/master 4b5cfc988 -> 740b034f1
[SPARK-4362] [MLLIB] Make prediction probability available in NaiveBayesModel
Add predictProbabilities to Naive Bayes, return class probabilities.
Continues https://github.com/apache/spark/pull/6761
Author: Sean Owen <so...@cloudera.com>
Closes #7376 from srowen/SPARK-4362 and squashes the following commits:
23d5a76 [Sean Owen] Fix model.labels -> model.theta
95d91fb [Sean Owen] Check that predicted probabilities sum to 1
b32d1c8 [Sean Owen] Add predictProbabilities to Naive Bayes, return class probabilities
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/740b034f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/740b034f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/740b034f
Branch: refs/heads/master
Commit: 740b034f1ca885a386f5a9ef7e0c81c714b047ff
Parents: 4b5cfc9
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Jul 14 22:44:54 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jul 14 22:44:54 2015 +0100
----------------------------------------------------------------------
.../spark/mllib/classification/NaiveBayes.scala | 76 +++++++++++++++-----
.../mllib/classification/NaiveBayesSuite.scala | 55 +++++++++++++-
2 files changed, 113 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/740b034f/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index f51ee36..9e379d7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -93,26 +93,70 @@ class NaiveBayesModel private[mllib] (
override def predict(testData: Vector): Double = {
modelType match {
case Multinomial =>
- val prob = thetaMatrix.multiply(testData)
- BLAS.axpy(1.0, piVector, prob)
- labels(prob.argmax)
+ labels(multinomialCalculation(testData).argmax)
case Bernoulli =>
- testData.foreachActive { (index, value) =>
- if (value != 0.0 && value != 1.0) {
- throw new SparkException(
- s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
- }
- }
- val prob = thetaMinusNegTheta.get.multiply(testData)
- BLAS.axpy(1.0, piVector, prob)
- BLAS.axpy(1.0, negThetaSum.get, prob)
- labels(prob.argmax)
- case _ =>
- // This should never happen.
- throw new UnknownError(s"Invalid modelType: $modelType.")
+ labels(bernoulliCalculation(testData).argmax)
+ }
+ }
+
+ /**
+ * Predict values for the given data set using the model trained.
+ *
+ * @param testData RDD representing data points to be predicted
+ * @return an RDD[Vector] where each entry contains the predicted posterior class probabilities,
+ * in the same order as class labels
+ */
+ def predictProbabilities(testData: RDD[Vector]): RDD[Vector] = {
+ val bcModel = testData.context.broadcast(this)
+ testData.mapPartitions { iter =>
+ val model = bcModel.value
+ iter.map(model.predictProbabilities)
}
}
+ /**
+ * Predict posterior class probabilities for a single data point using the model trained.
+ *
+ * @param testData array representing a single data point
+ * @return predicted posterior class probabilities from the trained model,
+ * in the same order as class labels
+ */
+ def predictProbabilities(testData: Vector): Vector = {
+ modelType match {
+ case Multinomial =>
+ posteriorProbabilities(multinomialCalculation(testData))
+ case Bernoulli =>
+ posteriorProbabilities(bernoulliCalculation(testData))
+ }
+ }
+
+ private def multinomialCalculation(testData: Vector) = {
+ val prob = thetaMatrix.multiply(testData)
+ BLAS.axpy(1.0, piVector, prob)
+ prob
+ }
+
+ private def bernoulliCalculation(testData: Vector) = {
+ testData.foreachActive((_, value) =>
+ if (value != 0.0 && value != 1.0) {
+ throw new SparkException(
+ s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
+ }
+ )
+ val prob = thetaMinusNegTheta.get.multiply(testData)
+ BLAS.axpy(1.0, piVector, prob)
+ BLAS.axpy(1.0, negThetaSum.get, prob)
+ prob
+ }
+
+ private def posteriorProbabilities(logProb: DenseVector) = {
+ val logProbArray = logProb.toArray
+ val maxLog = logProbArray.max
+ val scaledProbs = logProbArray.map(lp => math.exp(lp - maxLog))
+ val probSum = scaledProbs.sum
+ new DenseVector(scaledProbs.map(_ / probSum))
+ }
+
override def save(sc: SparkContext, path: String): Unit = {
val data = NaiveBayesModel.SaveLoadV2_0.Data(labels, pi, theta, modelType)
NaiveBayesModel.SaveLoadV2_0.save(sc, path, data)
http://git-wip-us.apache.org/repos/asf/spark/blob/740b034f/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index f7fc873..cffa1ab 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -19,13 +19,14 @@ package org.apache.spark.mllib.classification
import scala.util.Random
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Vector => BV}
import breeze.stats.distributions.{Multinomial => BrzMultinomial}
import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
+import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.util.Utils
object NaiveBayesSuite {
@@ -154,6 +155,29 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext {
// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+
+ // Test posteriors
+ validationData.map(_.features).foreach { features =>
+ val predicted = model.predictProbabilities(features).toArray
+ assert(predicted.sum ~== 1.0 relTol 1.0e-10)
+ val expected = expectedMultinomialProbabilities(model, features)
+ expected.zip(predicted).foreach { case (e, p) => assert(e ~== p relTol 1.0e-10) }
+ }
+ }
+
+ /**
+ * @param model Multinomial Naive Bayes model
+ * @param testData input to compute posterior probabilities for
+ * @return posterior class probabilities (in order of labels) for input
+ */
+ private def expectedMultinomialProbabilities(model: NaiveBayesModel, testData: Vector) = {
+ val piVector = new BDV(model.pi)
+ // model.theta is row-major; treat it as col-major representation of transpose, and transpose:
+ val thetaMatrix = new BDM(model.theta(0).length, model.theta.length, model.theta.flatten).t
+ val logClassProbs: BV[Double] = piVector + (thetaMatrix * testData.toBreeze)
+ val classProbs = logClassProbs.toArray.map(math.exp)
+ val classProbsSum = classProbs.sum
+ classProbs.map(_ / classProbsSum)
}
test("Naive Bayes Bernoulli") {
@@ -182,6 +206,33 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext {
// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+
+ // Test posteriors
+ validationData.map(_.features).foreach { features =>
+ val predicted = model.predictProbabilities(features).toArray
+ assert(predicted.sum ~== 1.0 relTol 1.0e-10)
+ val expected = expectedBernoulliProbabilities(model, features)
+ expected.zip(predicted).foreach { case (e, p) => assert(e ~== p relTol 1.0e-10) }
+ }
+ }
+
+ /**
+ * @param model Bernoulli Naive Bayes model
+ * @param testData input to compute posterior probabilities for
+ * @return posterior class probabilities (in order of labels) for input
+ */
+ private def expectedBernoulliProbabilities(model: NaiveBayesModel, testData: Vector) = {
+ val piVector = new BDV(model.pi)
+ val thetaMatrix = new BDM(model.theta(0).length, model.theta.length, model.theta.flatten).t
+ val negThetaMatrix = new BDM(model.theta(0).length, model.theta.length,
+ model.theta.flatten.map(v => math.log(1.0 - math.exp(v)))).t
+ val testBreeze = testData.toBreeze
+ val negTestBreeze = new BDV(Array.fill(testBreeze.size)(1.0)) - testBreeze
+ val piTheta: BV[Double] = piVector + (thetaMatrix * testBreeze)
+ val logClassProbs: BV[Double] = piTheta + (negThetaMatrix * negTestBreeze)
+ val classProbs = logClassProbs.toArray.map(math.exp)
+ val classProbsSum = classProbs.sum
+ classProbs.map(_ / classProbsSum)
}
test("detect negative values") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org