You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2020/05/05 13:36:26 UTC
[spark] branch master updated: [SPARK-31603][ML] AFT uses common
functions in RDDLossFunction
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 701deac [SPARK-31603][ML] AFT uses common functions in RDDLossFunction
701deac is described below
commit 701deac88d09690ddf9d28b9c79814aecfd3277d
Author: zhengruifeng <ru...@foxmail.com>
AuthorDate: Tue May 5 08:35:20 2020 -0500
[SPARK-31603][ML] AFT uses common functions in RDDLossFunction
### What changes were proposed in this pull request?
1, make AFT reuse common functions in `ml.optim`, rather than making its own impl.
### Why are the changes needed?
The logic in optimizing AFT is quite similar to other algorithms like other algs based on `RDDLossFunction`,
We should reuse the common functions.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
existing testsuites
Closes #28404 from zhengruifeng/mv_aft_optim.
Authored-by: zhengruifeng <ru...@foxmail.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../spark/ml/optim/aggregator/AFTAggregator.scala | 162 +++++++++++++++
.../aggregator/DifferentiableLossAggregator.scala | 9 +-
.../ml/regression/AFTSurvivalRegression.scala | 228 +--------------------
3 files changed, 173 insertions(+), 226 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala
new file mode 100644
index 0000000..6482c61
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.regression.AFTPoint
+
+/**
+ * AFTAggregator computes the gradient and loss for a AFT loss function,
+ * as used in AFT survival regression for samples in sparse or dense vector in an online fashion.
+ *
+ * The loss function and likelihood function under the AFT model based on:
+ * Lawless, J. F., Statistical Models and Methods for Lifetime Data,
+ * New York: John Wiley & Sons, Inc. 2003.
+ *
+ * Two AFTAggregator can be merged together to have a summary of loss and gradient of
+ * the corresponding joint dataset.
+ *
+ * Given the values of the covariates $x^{'}$, for random lifetime $t_{i}$ of subjects i = 1,..,n,
+ * with possible right-censoring, the likelihood function under the AFT model is given as
+ *
+ * <blockquote>
+ * $$
+ * L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}
+ * (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}
+ * (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}}
+ * $$
+ * </blockquote>
+ *
+ * Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not.
+ * Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{'}\beta}{\sigma}$, the log-likelihood function
+ * assumes the form
+ *
+ * <blockquote>
+ * $$
+ * \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+
+ * \delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}]
+ * $$
+ * </blockquote>
+ * Where $S_{0}(\epsilon_{i})$ is the baseline survivor function,
+ * and $f_{0}(\epsilon_{i})$ is corresponding density function.
+ *
+ * The most commonly used log-linear survival regression method is based on the Weibull
+ * distribution of the survival time. The Weibull distribution for lifetime corresponding
+ * to extreme value distribution for log of the lifetime,
+ * and the $S_{0}(\epsilon)$ function is
+ *
+ * <blockquote>
+ * $$
+ * S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}})
+ * $$
+ * </blockquote>
+ *
+ * and the $f_{0}(\epsilon_{i})$ function is
+ *
+ * <blockquote>
+ * $$
+ * f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}})
+ * $$
+ * </blockquote>
+ *
+ * The log-likelihood function for Weibull distribution of lifetime is
+ *
+ * <blockquote>
+ * $$
+ * \iota(\beta,\sigma)=
+ * -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}]
+ * $$
+ * </blockquote>
+ *
+ * Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability,
+ * the loss function we use to optimize is $-\iota(\beta,\sigma)$.
+ * The gradient functions for $\beta$ and $\log\sigma$ respectively are
+ *
+ * <blockquote>
+ * $$
+ * \frac{\partial (-\iota)}{\partial \beta}=
+ * \sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \\
+ *
+ * \frac{\partial (-\iota)}{\partial (\log\sigma)}=
+ * \sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}]
+ * $$
+ * </blockquote>
+ *
+ * @param bcCoefficients The broadcasted value includes three part: 1, regression coefficients
+ * corresponding to the features; 2, the intercept; 3, the log of scale
+ * parameter.
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param bcFeaturesStd The broadcast standard deviation values of the features.
+ */
+
+private[ml] class AFTAggregator(
+ bcFeaturesStd: Broadcast[Array[Double]],
+ fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
+ extends DifferentiableLossAggregator[AFTPoint, AFTAggregator] {
+
+ protected override val dim: Int = bcCoefficients.value.size
+
+ /**
+ * Add a new training data to this AFTAggregator, and update the loss and gradient
+ * of the objective function.
+ *
+ * @param data The AFTPoint representation for one data point to be added into this aggregator.
+ * @return This AFTAggregator object.
+ */
+ def add(data: AFTPoint): this.type = {
+ val coefficients = bcCoefficients.value.toArray
+ val intercept = coefficients(dim - 2)
+ // sigma is the scale parameter of the AFT model
+ val sigma = math.exp(coefficients(dim - 1))
+
+ val xi = data.features
+ val ti = data.label
+ val delta = data.censor
+
+ require(ti > 0.0, "The lifetime or label should be greater than 0.")
+
+ val localFeaturesStd = bcFeaturesStd.value
+
+ val margin = {
+ var sum = 0.0
+ xi.foreachNonZero { (index, value) =>
+ if (localFeaturesStd(index) != 0.0) {
+ sum += coefficients(index) * (value / localFeaturesStd(index))
+ }
+ }
+ sum + intercept
+ }
+ val epsilon = (math.log(ti) - margin) / sigma
+
+ lossSum += delta * math.log(sigma) - delta * epsilon + math.exp(epsilon)
+
+ val multiplier = (delta - math.exp(epsilon)) / sigma
+
+ xi.foreachNonZero { (index, value) =>
+ if (localFeaturesStd(index) != 0.0) {
+ gradientSumArray(index) += multiplier * (value / localFeaturesStd(index))
+ }
+ }
+ gradientSumArray(dim - 2) += { if (fitIntercept) multiplier else 0.0 }
+ gradientSumArray(dim - 1) += delta + multiplier * sigma * epsilon
+
+ weightSum += 1.0
+ this
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
index 403c28f..b247723 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
@@ -53,14 +53,7 @@ private[ml] trait DifferentiableLossAggregator[
if (other.weightSum != 0) {
weightSum += other.weightSum
lossSum += other.lossSum
-
- var i = 0
- val localThisGradientSumArray = this.gradientSumArray
- val localOtherGradientSumArray = other.gradientSumArray
- while (i < dim) {
- localThisGradientSumArray(i) += localOtherGradientSumArray(i)
- i += 1
- }
+ BLAS.getBLAS(dim).daxpy(dim, 1.0, other.gradientSumArray, 1, this.gradientSumArray, 1)
}
this
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 2da65a0..8cc5f86 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -20,15 +20,16 @@ package org.apache.spark.ml.regression
import scala.collection.mutable
import breeze.linalg.{DenseVector => BDV}
-import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS}
+import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS}
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
+import org.apache.spark.ml.optim.aggregator.AFTAggregator
+import org.apache.spark.ml.optim.loss.RDDLossFunction
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.stat._
@@ -208,7 +209,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
)
val featuresStd = featuresSummarizer.std.toArray
- val numFeatures = featuresStd.size
+ val numFeatures = featuresStd.length
instr.logPipelineStage(this)
instr.logDataset(dataset)
@@ -226,9 +227,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
}
val bcFeaturesStd = instances.context.broadcast(featuresStd)
-
- val costFun = new AFTCostFun(instances, $(fitIntercept), bcFeaturesStd, $(aggregationDepth))
+ val getAggregatorFunc = new AFTAggregator(bcFeaturesStd, $(fitIntercept))(_)
val optimizer = new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
+ val costFun = new RDDLossFunction(instances, getAggregatorFunc, None, $(aggregationDepth))
/*
The parameters vector has three parts:
@@ -258,15 +259,15 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
bcFeaturesStd.destroy()
if (handlePersistence) instances.unpersist()
- val rawCoefficients = parameters.slice(2, parameters.length)
+ val rawCoefficients = parameters.take(numFeatures)
var i = 0
while (i < numFeatures) {
rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
i += 1
}
val coefficients = Vectors.dense(rawCoefficients)
- val intercept = parameters(1)
- val scale = math.exp(parameters(0))
+ val intercept = parameters(numFeatures)
+ val scale = math.exp(parameters(numFeatures + 1))
new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale)
}
@@ -434,215 +435,6 @@ object AFTSurvivalRegressionModel extends MLReadable[AFTSurvivalRegressionModel]
}
/**
- * AFTAggregator computes the gradient and loss for a AFT loss function,
- * as used in AFT survival regression for samples in sparse or dense vector in an online fashion.
- *
- * The loss function and likelihood function under the AFT model based on:
- * Lawless, J. F., Statistical Models and Methods for Lifetime Data,
- * New York: John Wiley & Sons, Inc. 2003.
- *
- * Two AFTAggregator can be merged together to have a summary of loss and gradient of
- * the corresponding joint dataset.
- *
- * Given the values of the covariates $x^{'}$, for random lifetime $t_{i}$ of subjects i = 1,..,n,
- * with possible right-censoring, the likelihood function under the AFT model is given as
- *
- * <blockquote>
- * $$
- * L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}
- * (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}
- * (\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}}
- * $$
- * </blockquote>
- *
- * Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not.
- * Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{'}\beta}{\sigma}$, the log-likelihood function
- * assumes the form
- *
- * <blockquote>
- * $$
- * \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+
- * \delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}]
- * $$
- * </blockquote>
- * Where $S_{0}(\epsilon_{i})$ is the baseline survivor function,
- * and $f_{0}(\epsilon_{i})$ is corresponding density function.
- *
- * The most commonly used log-linear survival regression method is based on the Weibull
- * distribution of the survival time. The Weibull distribution for lifetime corresponding
- * to extreme value distribution for log of the lifetime,
- * and the $S_{0}(\epsilon)$ function is
- *
- * <blockquote>
- * $$
- * S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}})
- * $$
- * </blockquote>
- *
- * and the $f_{0}(\epsilon_{i})$ function is
- *
- * <blockquote>
- * $$
- * f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}})
- * $$
- * </blockquote>
- *
- * The log-likelihood function for Weibull distribution of lifetime is
- *
- * <blockquote>
- * $$
- * \iota(\beta,\sigma)=
- * -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}]
- * $$
- * </blockquote>
- *
- * Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability,
- * the loss function we use to optimize is $-\iota(\beta,\sigma)$.
- * The gradient functions for $\beta$ and $\log\sigma$ respectively are
- *
- * <blockquote>
- * $$
- * \frac{\partial (-\iota)}{\partial \beta}=
- * \sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \\
- *
- * \frac{\partial (-\iota)}{\partial (\log\sigma)}=
- * \sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}]
- * $$
- * </blockquote>
- *
- * @param bcParameters The broadcasted value includes three part: The log of scale parameter,
- * the intercept and regression coefficients corresponding to the features.
- * @param fitIntercept Whether to fit an intercept term.
- * @param bcFeaturesStd The broadcast standard deviation values of the features.
- */
-private class AFTAggregator(
- bcParameters: Broadcast[BDV[Double]],
- fitIntercept: Boolean,
- bcFeaturesStd: Broadcast[Array[Double]]) extends Serializable {
-
- private val length = bcParameters.value.length
- // make transient so we do not serialize between aggregation stages
- @transient private lazy val parameters = bcParameters.value
- // the regression coefficients to the covariates
- @transient private lazy val coefficients = parameters.slice(2, length)
- @transient private lazy val intercept = parameters(1)
- // sigma is the scale parameter of the AFT model
- @transient private lazy val sigma = math.exp(parameters(0))
-
- private var totalCnt: Long = 0L
- private var lossSum = 0.0
- // Here we optimize loss function over log(sigma), intercept and coefficients
- private lazy val gradientSumArray = Array.ofDim[Double](length)
-
- def count: Long = totalCnt
- def loss: Double = {
- require(totalCnt > 0.0, s"The number of instances should be " +
- s"greater than 0.0, but got $totalCnt.")
- lossSum / totalCnt
- }
- def gradient: BDV[Double] = {
- require(totalCnt > 0.0, s"The number of instances should be " +
- s"greater than 0.0, but got $totalCnt.")
- new BDV(gradientSumArray.map(_ / totalCnt.toDouble))
- }
-
-
- /**
- * Add a new training data to this AFTAggregator, and update the loss and gradient
- * of the objective function.
- *
- * @param data The AFTPoint representation for one data point to be added into this aggregator.
- * @return This AFTAggregator object.
- */
- def add(data: AFTPoint): this.type = {
- val xi = data.features
- val ti = data.label
- val delta = data.censor
-
- require(ti > 0.0, "The lifetime or label should be greater than 0.")
-
- val localFeaturesStd = bcFeaturesStd.value
-
- val margin = {
- var sum = 0.0
- xi.foreachNonZero { (index, value) =>
- if (localFeaturesStd(index) != 0.0) {
- sum += coefficients(index) * (value / localFeaturesStd(index))
- }
- }
- sum + intercept
- }
- val epsilon = (math.log(ti) - margin) / sigma
-
- lossSum += delta * math.log(sigma) - delta * epsilon + math.exp(epsilon)
-
- val multiplier = (delta - math.exp(epsilon)) / sigma
-
- gradientSumArray(0) += delta + multiplier * sigma * epsilon
- gradientSumArray(1) += { if (fitIntercept) multiplier else 0.0 }
- xi.foreachNonZero { (index, value) =>
- if (localFeaturesStd(index) != 0.0) {
- gradientSumArray(index + 2) += multiplier * (value / localFeaturesStd(index))
- }
- }
-
- totalCnt += 1
- this
- }
-
- /**
- * Merge another AFTAggregator, and update the loss and gradient
- * of the objective function.
- * (Note that it's in place merging; as a result, `this` object will be modified.)
- *
- * @param other The other AFTAggregator to be merged.
- * @return This AFTAggregator object.
- */
- def merge(other: AFTAggregator): this.type = {
- if (other.count != 0) {
- totalCnt += other.totalCnt
- lossSum += other.lossSum
-
- var i = 0
- while (i < length) {
- this.gradientSumArray(i) += other.gradientSumArray(i)
- i += 1
- }
- }
- this
- }
-}
-
-/**
- * AFTCostFun implements Breeze's DiffFunction[T] for AFT cost.
- * It returns the loss and gradient at a particular point (parameters).
- * It's used in Breeze's convex optimization routines.
- */
-private class AFTCostFun(
- data: RDD[AFTPoint],
- fitIntercept: Boolean,
- bcFeaturesStd: Broadcast[Array[Double]],
- aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
-
- override def calculate(parameters: BDV[Double]): (Double, BDV[Double]) = {
-
- val bcParameters = data.context.broadcast(parameters)
-
- val aftAggregator = data.treeAggregate(
- new AFTAggregator(bcParameters, fitIntercept, bcFeaturesStd))(
- seqOp = (c, v) => (c, v) match {
- case (aggregator, instance) => aggregator.add(instance)
- },
- combOp = (c1, c2) => (c1, c2) match {
- case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
- }, depth = aggregationDepth)
-
- bcParameters.destroy()
- (aftAggregator.loss, aftAggregator.gradient)
- }
-}
-
-/**
* Class that represents the (features, label, censor) of a data point.
*
* @param features List of features for this data point.
@@ -650,6 +442,6 @@ private class AFTCostFun(
* @param censor Indicator of the event has occurred or not. If the value is 1, it means
* the event has occurred i.e. uncensored; otherwise censored.
*/
-private[regression] case class AFTPoint(features: Vector, label: Double, censor: Double) {
+private[ml] case class AFTPoint(features: Vector, label: Double, censor: Double) {
require(censor == 1.0 || censor == 0.0, "censor of class AFTPoint must be 1.0 or 0.0")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org