You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2017/08/23 04:16:38 UTC

spark git commit: [SPARK-12664][ML] Expose probability in mlp model

Repository: spark
Updated Branches:
  refs/heads/master d58a3507e -> d6b30edd4


[SPARK-12664][ML] Expose probability in mlp model

## What changes were proposed in this pull request?

Modify MLP model to inherit `ProbabilisticClassificationModel` and so that it can expose the probability  column when transforming data.

## How was this patch tested?

Test added.

Author: WeichenXu <We...@outlook.com>

Closes #17373 from WeichenXu123/expose_probability_in_mlp_model.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6b30edd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6b30edd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6b30edd

Branch: refs/heads/master
Commit: d6b30edd4974b593cc8085f680ccb524c7722c85
Parents: d58a350
Author: Weichen Xu <we...@databricks.com>
Authored: Tue Aug 22 21:16:34 2017 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Tue Aug 22 21:16:34 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ml/ann/Layer.scala   | 53 +++++++++++++++++---
 .../MultilayerPerceptronClassifier.scala        | 17 +++++--
 .../org/apache/spark/ml/ann/GradientSuite.scala |  2 +-
 .../MultilayerPerceptronClassifierSuite.scala   | 42 ++++++++++++++++
 python/pyspark/ml/classification.py             |  4 +-
 5 files changed, 103 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d6b30edd/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
index e7e0dae..014ff07 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
@@ -361,17 +361,42 @@ private[ann] trait TopologyModel extends Serializable {
    * Forward propagation
    *
    * @param data input data
+   * @param includeLastLayer Include the last layer in the output. In
+   *                         MultilayerPerceptronClassifier, the last layer is always softmax;
+   *                         the last layer of outputs is needed for class predictions, but not
+   *                         for rawPrediction.
+   *
    * @return array of outputs for each of the layers
    */
-  def forward(data: BDM[Double]): Array[BDM[Double]]
+  def forward(data: BDM[Double], includeLastLayer: Boolean): Array[BDM[Double]]
 
   /**
-   * Prediction of the model
+   * Prediction of the model. See {@link ProbabilisticClassificationModel}
    *
-   * @param data input data
+   * @param features input features
    * @return prediction
    */
-  def predict(data: Vector): Vector
+  def predict(features: Vector): Vector
+
+  /**
+   * Raw prediction of the model. See {@link ProbabilisticClassificationModel}
+   *
+   * @param features input features
+   * @return raw prediction
+   *
+   * Note: This interface is only used for classification Model.
+   */
+  def predictRaw(features: Vector): Vector
+
+  /**
+   * Probability of the model. See {@link ProbabilisticClassificationModel}
+   *
+   * @param rawPrediction raw prediction vector
+   * @return probability
+   *
+   * Note: This interface is only used for classification Model.
+   */
+  def raw2ProbabilityInPlace(rawPrediction: Vector): Vector
 
   /**
    * Computes gradient for the network
@@ -463,7 +488,7 @@ private[ml] class FeedForwardModel private(
   private var outputs: Array[BDM[Double]] = null
   private var deltas: Array[BDM[Double]] = null
 
-  override def forward(data: BDM[Double]): Array[BDM[Double]] = {
+  override def forward(data: BDM[Double], includeLastLayer: Boolean): Array[BDM[Double]] = {
     // Initialize output arrays for all layers. Special treatment for InPlace
     val currentBatchSize = data.cols
     // TODO: allocate outputs as one big array and then create BDMs from it
@@ -481,7 +506,8 @@ private[ml] class FeedForwardModel private(
       }
     }
     layerModels(0).eval(data, outputs(0))
-    for (i <- 1 until layerModels.length) {
+    val end = if (includeLastLayer) layerModels.length else layerModels.length - 1
+    for (i <- 1 until end) {
       layerModels(i).eval(outputs(i - 1), outputs(i))
     }
     outputs
@@ -492,7 +518,7 @@ private[ml] class FeedForwardModel private(
     target: BDM[Double],
     cumGradient: Vector,
     realBatchSize: Int): Double = {
-    val outputs = forward(data)
+    val outputs = forward(data, true)
     val currentBatchSize = data.cols
     // TODO: allocate deltas as one big array and then create BDMs from it
     if (deltas == null || deltas(0).cols != currentBatchSize) {
@@ -527,9 +553,20 @@ private[ml] class FeedForwardModel private(
 
   override def predict(data: Vector): Vector = {
     val size = data.size
-    val result = forward(new BDM[Double](size, 1, data.toArray))
+    val result = forward(new BDM[Double](size, 1, data.toArray), true)
     Vectors.dense(result.last.toArray)
   }
+
+  override def predictRaw(data: Vector): Vector = {
+    val result = forward(new BDM[Double](data.size, 1, data.toArray), false)
+    Vectors.dense(result(result.length - 2).toArray)
+  }
+
+  override def raw2ProbabilityInPlace(data: Vector): Vector = {
+    val dataMatrix = new BDM[Double](data.size, 1, data.toArray)
+    layerModels.last.eval(dataMatrix, dataMatrix)
+    data
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d6b30edd/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
index ceba11e..14a0c9f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
@@ -32,7 +32,7 @@ import org.apache.spark.ml.util._
 import org.apache.spark.sql.Dataset
 
 /** Params for Multilayer Perceptron. */
-private[classification] trait MultilayerPerceptronParams extends PredictorParams
+private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
   with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {
 
   import MultilayerPerceptronClassifier._
@@ -143,7 +143,8 @@ private object LabelConverter {
 @Since("1.5.0")
 class MultilayerPerceptronClassifier @Since("1.5.0") (
     @Since("1.5.0") override val uid: String)
-  extends Predictor[Vector, MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel]
+  extends ProbabilisticClassifier[Vector, MultilayerPerceptronClassifier,
+    MultilayerPerceptronClassificationModel]
   with MultilayerPerceptronParams with DefaultParamsWritable {
 
   @Since("1.5.0")
@@ -301,13 +302,13 @@ class MultilayerPerceptronClassificationModel private[ml] (
     @Since("1.5.0") override val uid: String,
     @Since("1.5.0") val layers: Array[Int],
     @Since("2.0.0") val weights: Vector)
-  extends PredictionModel[Vector, MultilayerPerceptronClassificationModel]
+  extends ProbabilisticClassificationModel[Vector, MultilayerPerceptronClassificationModel]
   with Serializable with MLWritable {
 
   @Since("1.6.0")
   override val numFeatures: Int = layers.head
 
-  private val mlpModel = FeedForwardTopology
+  private[ml] val mlpModel = FeedForwardTopology
     .multiLayerPerceptron(layers, softmaxOnTop = true)
     .model(weights)
 
@@ -335,6 +336,14 @@ class MultilayerPerceptronClassificationModel private[ml] (
   @Since("2.0.0")
   override def write: MLWriter =
     new MultilayerPerceptronClassificationModel.MultilayerPerceptronClassificationModelWriter(this)
+
+  override protected def raw2probabilityInPlace(rawPrediction: Vector): Vector = {
+    mlpModel.raw2ProbabilityInPlace(rawPrediction)
+  }
+
+  override protected def predictRaw(features: Vector): Vector = mlpModel.predictRaw(features)
+
+  override def numClasses: Int = layers.last
 }
 
 @Since("2.0.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/d6b30edd/mllib/src/test/scala/org/apache/spark/ml/ann/GradientSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/ann/GradientSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/ann/GradientSuite.scala
index f0c0183..2f22564 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/ann/GradientSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/ann/GradientSuite.scala
@@ -64,7 +64,7 @@ class GradientSuite extends SparkFunSuite with MLlibTestSparkContext {
   }
 
   private def computeLoss(input: BDM[Double], target: BDM[Double], model: TopologyModel): Double = {
-    val outputs = model.forward(input)
+    val outputs = model.forward(input, true)
     model.layerModels.last match {
       case layerWithLoss: LossFunction =>
         layerWithLoss.loss(outputs.last, target, new BDM[Double](target.rows, target.cols))

http://git-wip-us.apache.org/repos/asf/spark/blob/d6b30edd/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
index ce54c3d..c294e4a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
 import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions._
 
 class MultilayerPerceptronClassifierSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
@@ -82,6 +83,47 @@ class MultilayerPerceptronClassifierSuite
     }
   }
 
+  test("Predicted class probabilities: calibration on toy dataset") {
+    val layers = Array[Int](4, 5, 2)
+
+    val strongDataset = Seq(
+      (Vectors.dense(1, 2, 3, 4), 0d, Vectors.dense(1d, 0d)),
+      (Vectors.dense(4, 3, 2, 1), 1d, Vectors.dense(0d, 1d)),
+      (Vectors.dense(1, 1, 1, 1), 0d, Vectors.dense(.5, .5)),
+      (Vectors.dense(1, 1, 1, 1), 1d, Vectors.dense(.5, .5))
+    ).toDF("features", "label", "expectedProbability")
+    val trainer = new MultilayerPerceptronClassifier()
+      .setLayers(layers)
+      .setBlockSize(1)
+      .setSeed(123L)
+      .setMaxIter(100)
+      .setSolver("l-bfgs")
+    val model = trainer.fit(strongDataset)
+    val result = model.transform(strongDataset)
+    result.select("probability", "expectedProbability").collect().foreach {
+      case Row(p: Vector, e: Vector) =>
+        assert(p ~== e absTol 1e-3)
+    }
+  }
+
+  test("test model probability") {
+    val layers = Array[Int](2, 5, 2)
+    val trainer = new MultilayerPerceptronClassifier()
+      .setLayers(layers)
+      .setBlockSize(1)
+      .setSeed(123L)
+      .setMaxIter(100)
+      .setSolver("l-bfgs")
+    val model = trainer.fit(dataset)
+    model.setProbabilityCol("probability")
+    val result = model.transform(dataset)
+    val features2prob = udf { features: Vector => model.mlpModel.predict(features) }
+    result.select(features2prob(col("features")), col("probability")).collect().foreach {
+      case Row(p1: Vector, p2: Vector) =>
+        assert(p1 ~== p2 absTol 1e-3)
+    }
+  }
+
   test("Test setWeights by training restart") {
     val dataFrame = Seq(
       (Vectors.dense(0.0, 0.0), 0.0),

http://git-wip-us.apache.org/repos/asf/spark/blob/d6b30edd/python/pyspark/ml/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 235cee4..f0f42a3 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -1378,7 +1378,7 @@ class MultilayerPerceptronClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol,
     >>> testDF = spark.createDataFrame([
     ...     (Vectors.dense([1.0, 0.0]),),
     ...     (Vectors.dense([0.0, 0.0]),)], ["features"])
-    >>> model.transform(testDF).show()
+    >>> model.transform(testDF).select("features", "prediction").show()
     +---------+----------+
     | features|prediction|
     +---------+----------+
@@ -1512,7 +1512,7 @@ class MultilayerPerceptronClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol,
         return self.getOrDefault(self.initialWeights)
 
 
-class MultilayerPerceptronClassificationModel(JavaModel, JavaPredictionModel, JavaMLWritable,
+class MultilayerPerceptronClassificationModel(JavaModel, JavaClassificationModel, JavaMLWritable,
                                               JavaMLReadable):
     """
     Model fitted by MultilayerPerceptronClassifier.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org