You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2020/05/05 13:36:26 UTC

[spark] branch master updated: [SPARK-31603][ML] AFT uses common functions in RDDLossFunction

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 701deac  [SPARK-31603][ML] AFT uses common functions in RDDLossFunction
701deac is described below

commit 701deac88d09690ddf9d28b9c79814aecfd3277d
Author: zhengruifeng <ru...@foxmail.com>
AuthorDate: Tue May 5 08:35:20 2020 -0500

    [SPARK-31603][ML] AFT uses common functions in RDDLossFunction
    
    ### What changes were proposed in this pull request?
    1, make AFT reuse common functions in `ml.optim`, rather than making its own impl.
    
    ### Why are the changes needed?
    The logic in optimizing AFT is quite similar to other algorithms like other algs based on `RDDLossFunction`,
    We should reuse the common functions.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    existing testsuites
    
    Closes #28404 from zhengruifeng/mv_aft_optim.
    
    Authored-by: zhengruifeng <ru...@foxmail.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../spark/ml/optim/aggregator/AFTAggregator.scala  | 162 +++++++++++++++
 .../aggregator/DifferentiableLossAggregator.scala  |   9 +-
 .../ml/regression/AFTSurvivalRegression.scala      | 228 +--------------------
 3 files changed, 173 insertions(+), 226 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala
new file mode 100644
index 0000000..6482c61
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.regression.AFTPoint
+
+/**
+ * AFTAggregator computes the gradient and loss for a AFT loss function,
+ * as used in AFT survival regression for samples in sparse or dense vector in an online fashion.
+ *
+ * The loss function and likelihood function under the AFT model based on:
+ * Lawless, J. F., Statistical Models and Methods for Lifetime Data,
+ * New York: John Wiley & Sons, Inc. 2003.
+ *
+ * Two AFTAggregator can be merged together to have a summary of loss and gradient of
+ * the corresponding joint dataset.
+ *
+ * Given the values of the covariates $x^{'}$, for random lifetime $t_{i}$ of subjects i = 1,..,n,
+ * with possible right-censoring, the likelihood function under the AFT model is given as
+ *
+ * <blockquote>
+ *    $$
+ *    L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}
+ *      (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}
+ *    (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}}
+ *    $$
+ * </blockquote>
+ *
+ * Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not.
+ * Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{'}\beta}{\sigma}$, the log-likelihood function
+ * assumes the form
+ *
+ * <blockquote>
+ *    $$
+ *    \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+
+ *    \delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}]
+ *    $$
+ * </blockquote>
+ * Where $S_{0}(\epsilon_{i})$ is the baseline survivor function,
+ * and $f_{0}(\epsilon_{i})$ is corresponding density function.
+ *
+ * The most commonly used log-linear survival regression method is based on the Weibull
+ * distribution of the survival time. The Weibull distribution for lifetime corresponding
+ * to extreme value distribution for log of the lifetime,
+ * and the $S_{0}(\epsilon)$ function is
+ *
+ * <blockquote>
+ *    $$
+ *    S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}})
+ *    $$
+ * </blockquote>
+ *
+ * and the $f_{0}(\epsilon_{i})$ function is
+ *
+ * <blockquote>
+ *    $$
+ *    f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}})
+ *    $$
+ * </blockquote>
+ *
+ * The log-likelihood function for Weibull distribution of lifetime is
+ *
+ * <blockquote>
+ *    $$
+ *    \iota(\beta,\sigma)=
+ *    -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}]
+ *    $$
+ * </blockquote>
+ *
+ * Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability,
+ * the loss function we use to optimize is $-\iota(\beta,\sigma)$.
+ * The gradient functions for $\beta$ and $\log\sigma$ respectively are
+ *
+ * <blockquote>
+ *    $$
+ *    \frac{\partial (-\iota)}{\partial \beta}=
+ *    \sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \\
+ *
+ *    \frac{\partial (-\iota)}{\partial (\log\sigma)}=
+ *    \sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}]
+ *    $$
+ * </blockquote>
+ *
+ * @param bcCoefficients The broadcasted value includes three part: 1, regression coefficients
+ *                       corresponding to the features; 2, the intercept; 3, the log of scale
+ *                       parameter.
+ * @param fitIntercept   Whether to fit an intercept term.
+ * @param bcFeaturesStd The broadcast standard deviation values of the features.
+ */
+
+private[ml] class AFTAggregator(
+    bcFeaturesStd: Broadcast[Array[Double]],
+    fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
+  extends DifferentiableLossAggregator[AFTPoint, AFTAggregator] {
+
+  protected override val dim: Int = bcCoefficients.value.size
+
+  /**
+   * Add a new training data to this AFTAggregator, and update the loss and gradient
+   * of the objective function.
+   *
+   * @param data The AFTPoint representation for one data point to be added into this aggregator.
+   * @return This AFTAggregator object.
+   */
+  def add(data: AFTPoint): this.type = {
+    val coefficients = bcCoefficients.value.toArray
+    val intercept = coefficients(dim - 2)
+    // sigma is the scale parameter of the AFT model
+    val sigma = math.exp(coefficients(dim - 1))
+
+    val xi = data.features
+    val ti = data.label
+    val delta = data.censor
+
+    require(ti > 0.0, "The lifetime or label should be  greater than 0.")
+
+    val localFeaturesStd = bcFeaturesStd.value
+
+    val margin = {
+      var sum = 0.0
+      xi.foreachNonZero { (index, value) =>
+        if (localFeaturesStd(index) != 0.0) {
+          sum += coefficients(index) * (value / localFeaturesStd(index))
+        }
+      }
+      sum + intercept
+    }
+    val epsilon = (math.log(ti) - margin) / sigma
+
+    lossSum += delta * math.log(sigma) - delta * epsilon + math.exp(epsilon)
+
+    val multiplier = (delta - math.exp(epsilon)) / sigma
+
+    xi.foreachNonZero { (index, value) =>
+      if (localFeaturesStd(index) != 0.0) {
+        gradientSumArray(index) += multiplier * (value / localFeaturesStd(index))
+      }
+    }
+    gradientSumArray(dim - 2) += { if (fitIntercept) multiplier else 0.0 }
+    gradientSumArray(dim - 1) += delta + multiplier * sigma * epsilon
+
+    weightSum += 1.0
+    this
+  }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
index 403c28f..b247723 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
@@ -53,14 +53,7 @@ private[ml] trait DifferentiableLossAggregator[
     if (other.weightSum != 0) {
       weightSum += other.weightSum
       lossSum += other.lossSum
-
-      var i = 0
-      val localThisGradientSumArray = this.gradientSumArray
-      val localOtherGradientSumArray = other.gradientSumArray
-      while (i < dim) {
-        localThisGradientSumArray(i) += localOtherGradientSumArray(i)
-        i += 1
-      }
+      BLAS.getBLAS(dim).daxpy(dim, 1.0, other.gradientSumArray, 1, this.gradientSumArray, 1)
     }
     this
   }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 2da65a0..8cc5f86 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -20,15 +20,16 @@ package org.apache.spark.ml.regression
 import scala.collection.mutable
 
 import breeze.linalg.{DenseVector => BDV}
-import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS}
+import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS}
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.PredictorParams
 import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
+import org.apache.spark.ml.optim.aggregator.AFTAggregator
+import org.apache.spark.ml.optim.loss.RDDLossFunction
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.stat._
@@ -208,7 +209,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
     )
 
     val featuresStd = featuresSummarizer.std.toArray
-    val numFeatures = featuresStd.size
+    val numFeatures = featuresStd.length
 
     instr.logPipelineStage(this)
     instr.logDataset(dataset)
@@ -226,9 +227,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
     }
 
     val bcFeaturesStd = instances.context.broadcast(featuresStd)
-
-    val costFun = new AFTCostFun(instances, $(fitIntercept), bcFeaturesStd, $(aggregationDepth))
+    val getAggregatorFunc = new AFTAggregator(bcFeaturesStd, $(fitIntercept))(_)
     val optimizer = new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
+    val costFun = new RDDLossFunction(instances, getAggregatorFunc, None, $(aggregationDepth))
 
     /*
        The parameters vector has three parts:
@@ -258,15 +259,15 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
     bcFeaturesStd.destroy()
     if (handlePersistence) instances.unpersist()
 
-    val rawCoefficients = parameters.slice(2, parameters.length)
+    val rawCoefficients = parameters.take(numFeatures)
     var i = 0
     while (i < numFeatures) {
       rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
       i += 1
     }
     val coefficients = Vectors.dense(rawCoefficients)
-    val intercept = parameters(1)
-    val scale = math.exp(parameters(0))
+    val intercept = parameters(numFeatures)
+    val scale = math.exp(parameters(numFeatures + 1))
     new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale)
   }
 
@@ -434,215 +435,6 @@ object AFTSurvivalRegressionModel extends MLReadable[AFTSurvivalRegressionModel]
 }
 
 /**
- * AFTAggregator computes the gradient and loss for a AFT loss function,
- * as used in AFT survival regression for samples in sparse or dense vector in an online fashion.
- *
- * The loss function and likelihood function under the AFT model based on:
- * Lawless, J. F., Statistical Models and Methods for Lifetime Data,
- * New York: John Wiley & Sons, Inc. 2003.
- *
- * Two AFTAggregator can be merged together to have a summary of loss and gradient of
- * the corresponding joint dataset.
- *
- * Given the values of the covariates $x^{'}$, for random lifetime $t_{i}$ of subjects i = 1,..,n,
- * with possible right-censoring, the likelihood function under the AFT model is given as
- *
- * <blockquote>
- *    $$
- *    L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}
- *      (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}
- *    (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}}
- *    $$
- * </blockquote>
- *
- * Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not.
- * Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{'}\beta}{\sigma}$, the log-likelihood function
- * assumes the form
- *
- * <blockquote>
- *    $$
- *    \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+
- *    \delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}]
- *    $$
- * </blockquote>
- * Where $S_{0}(\epsilon_{i})$ is the baseline survivor function,
- * and $f_{0}(\epsilon_{i})$ is corresponding density function.
- *
- * The most commonly used log-linear survival regression method is based on the Weibull
- * distribution of the survival time. The Weibull distribution for lifetime corresponding
- * to extreme value distribution for log of the lifetime,
- * and the $S_{0}(\epsilon)$ function is
- *
- * <blockquote>
- *    $$
- *    S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}})
- *    $$
- * </blockquote>
- *
- * and the $f_{0}(\epsilon_{i})$ function is
- *
- * <blockquote>
- *    $$
- *    f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}})
- *    $$
- * </blockquote>
- *
- * The log-likelihood function for Weibull distribution of lifetime is
- *
- * <blockquote>
- *    $$
- *    \iota(\beta,\sigma)=
- *    -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}]
- *    $$
- * </blockquote>
- *
- * Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability,
- * the loss function we use to optimize is $-\iota(\beta,\sigma)$.
- * The gradient functions for $\beta$ and $\log\sigma$ respectively are
- *
- * <blockquote>
- *    $$
- *    \frac{\partial (-\iota)}{\partial \beta}=
- *    \sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \\
- *
- *    \frac{\partial (-\iota)}{\partial (\log\sigma)}=
- *    \sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}]
- *    $$
- * </blockquote>
- *
- * @param bcParameters The broadcasted value includes three part: The log of scale parameter,
- *                     the intercept and regression coefficients corresponding to the features.
- * @param fitIntercept Whether to fit an intercept term.
- * @param bcFeaturesStd The broadcast standard deviation values of the features.
- */
-private class AFTAggregator(
-    bcParameters: Broadcast[BDV[Double]],
-    fitIntercept: Boolean,
-    bcFeaturesStd: Broadcast[Array[Double]]) extends Serializable {
-
-  private val length = bcParameters.value.length
-  // make transient so we do not serialize between aggregation stages
-  @transient private lazy val parameters = bcParameters.value
-  // the regression coefficients to the covariates
-  @transient private lazy val coefficients = parameters.slice(2, length)
-  @transient private lazy val intercept = parameters(1)
-  // sigma is the scale parameter of the AFT model
-  @transient private lazy val sigma = math.exp(parameters(0))
-
-  private var totalCnt: Long = 0L
-  private var lossSum = 0.0
-  // Here we optimize loss function over log(sigma), intercept and coefficients
-  private lazy val gradientSumArray = Array.ofDim[Double](length)
-
-  def count: Long = totalCnt
-  def loss: Double = {
-    require(totalCnt > 0.0, s"The number of instances should be " +
-      s"greater than 0.0, but got $totalCnt.")
-    lossSum / totalCnt
-  }
-  def gradient: BDV[Double] = {
-    require(totalCnt > 0.0, s"The number of instances should be " +
-      s"greater than 0.0, but got $totalCnt.")
-    new BDV(gradientSumArray.map(_ / totalCnt.toDouble))
-  }
-
-
-  /**
-   * Add a new training data to this AFTAggregator, and update the loss and gradient
-   * of the objective function.
-   *
-   * @param data The AFTPoint representation for one data point to be added into this aggregator.
-   * @return This AFTAggregator object.
-   */
-  def add(data: AFTPoint): this.type = {
-    val xi = data.features
-    val ti = data.label
-    val delta = data.censor
-
-    require(ti > 0.0, "The lifetime or label should be  greater than 0.")
-
-    val localFeaturesStd = bcFeaturesStd.value
-
-    val margin = {
-      var sum = 0.0
-      xi.foreachNonZero { (index, value) =>
-        if (localFeaturesStd(index) != 0.0) {
-          sum += coefficients(index) * (value / localFeaturesStd(index))
-        }
-      }
-      sum + intercept
-    }
-    val epsilon = (math.log(ti) - margin) / sigma
-
-    lossSum += delta * math.log(sigma) - delta * epsilon + math.exp(epsilon)
-
-    val multiplier = (delta - math.exp(epsilon)) / sigma
-
-    gradientSumArray(0) += delta + multiplier * sigma * epsilon
-    gradientSumArray(1) += { if (fitIntercept) multiplier else 0.0 }
-    xi.foreachNonZero { (index, value) =>
-      if (localFeaturesStd(index) != 0.0) {
-        gradientSumArray(index + 2) += multiplier * (value / localFeaturesStd(index))
-      }
-    }
-
-    totalCnt += 1
-    this
-  }
-
-  /**
-   * Merge another AFTAggregator, and update the loss and gradient
-   * of the objective function.
-   * (Note that it's in place merging; as a result, `this` object will be modified.)
-   *
-   * @param other The other AFTAggregator to be merged.
-   * @return This AFTAggregator object.
-   */
-  def merge(other: AFTAggregator): this.type = {
-    if (other.count != 0) {
-      totalCnt += other.totalCnt
-      lossSum += other.lossSum
-
-      var i = 0
-      while (i < length) {
-        this.gradientSumArray(i) += other.gradientSumArray(i)
-        i += 1
-      }
-    }
-    this
-  }
-}
-
-/**
- * AFTCostFun implements Breeze's DiffFunction[T] for AFT cost.
- * It returns the loss and gradient at a particular point (parameters).
- * It's used in Breeze's convex optimization routines.
- */
-private class AFTCostFun(
-    data: RDD[AFTPoint],
-    fitIntercept: Boolean,
-    bcFeaturesStd: Broadcast[Array[Double]],
-    aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
-
-  override def calculate(parameters: BDV[Double]): (Double, BDV[Double]) = {
-
-    val bcParameters = data.context.broadcast(parameters)
-
-    val aftAggregator = data.treeAggregate(
-      new AFTAggregator(bcParameters, fitIntercept, bcFeaturesStd))(
-      seqOp = (c, v) => (c, v) match {
-        case (aggregator, instance) => aggregator.add(instance)
-      },
-      combOp = (c1, c2) => (c1, c2) match {
-        case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
-      }, depth = aggregationDepth)
-
-    bcParameters.destroy()
-    (aftAggregator.loss, aftAggregator.gradient)
-  }
-}
-
-/**
  * Class that represents the (features, label, censor) of a data point.
  *
  * @param features List of features for this data point.
@@ -650,6 +442,6 @@ private class AFTCostFun(
  * @param censor Indicator of the event has occurred or not. If the value is 1, it means
  *                 the event has occurred i.e. uncensored; otherwise censored.
  */
-private[regression] case class AFTPoint(features: Vector, label: Double, censor: Double) {
+private[ml] case class AFTPoint(features: Vector, label: Double, censor: Double) {
   require(censor == 1.0 || censor == 0.0, "censor of class AFTPoint must be 1.0 or 0.0")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org