You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/06/05 09:32:21 UTC
spark git commit: [SPARK-19762][ML] Hierarchy for consolidating ML
aggregator/loss code
Repository: spark
Updated Branches:
refs/heads/master 98b5ccd32 -> 1665b5f72
[SPARK-19762][ML] Hierarchy for consolidating ML aggregator/loss code
## What changes were proposed in this pull request?
JIRA: [SPARK-19762](https://issues.apache.org/jira/browse/SPARK-19762)
The larger changes in this patch are:
* Adds a `DifferentiableLossAggregator` trait which is intended to be used as a common parent trait to all Spark ML aggregator classes. It factors out the common methods: `merge, gradient, loss, weight` from the aggregator subclasses.
* Adds a `RDDLossFunction` which is intended to be the only implementation of Breeze's `DiffFunction` necessary in Spark ML, and can be used by all other algorithms. It takes the aggregator type as a type parameter, and maps the aggregator over an RDD. It additionally takes in a optional regularization loss function for applying the differentiable part of regularization.
* Factors out the regularization from the data part of the cost function, and treats regularization as a separate independent cost function which can be evaluated and added to the data cost function.
* Changes `LinearRegression` to use this new hierarchy as a proof of concept.
* Adds the following new namespaces `o.a.s.ml.optim.loss` and `o.a.s.ml.optim.aggregator`
Also note that none of these are public-facing changes. All of these classes are internal to Spark ML and remain that way.
**NOTE: The large majority of the "lines added" and "lines deleted" are simply code moving around or unit tests.**
BTW, I also converted LinearSVC to this framework as a way to prove that this new hierarchy is flexible enough for the other algorithms, but I backed those changes out because the PR is large enough as is.
## How was this patch tested?
Test suites are added for the new components, and some test suites are also added to provide coverage where there wasn't any before.
* DifferentiablLossAggregatorSuite
* LeastSquaresAggregatorSuite
* RDDLossFunctionSuite
* DifferentiableRegularizationSuite
Below are some performance testing numbers. Run on a 6 node virtual cluster with 44 cores and ~110G RAM, the dataset size is about 37G. These are not "large-scale" tests, but we really want to just make sure the iteration times don't increase with this patch. Notably we are doing the regularization a bit differently than before, but that should cost very little. I think there's very little risk otherwise, and these numbers don't show a difference. Of course I'm happy to add more tests as we think it's necessary, but I think the patch is ready for review now.
**Note:** timings are best of 3 runs.
| | numFeatures | numPoints | maxIter | regParam | elasticNetParam | SPARK-19762 (sec) | master (sec) |
|----|---------------|-------------|-----------|------------|-------------------|---------------------|----------------|
| 0 | 5000 | 1e+06 | 30 | 0 | 0 | 129.594 | 131.153 |
| 1 | 5000 | 1e+06 | 30 | 0.1 | 0 | 135.54 | 136.327 |
| 2 | 5000 | 1e+06 | 30 | 0.01 | 0.5 | 135.148 | 129.771 |
| 3 | 50000 | 100000 | 30 | 0 | 0 | 145.764 | 144.096 |
## Follow ups
If this design is accepted, we will convert the other ML algorithms that use this aggregator pattern to this new hierarchy in follow up PRs.
Author: sethah <se...@gmail.com>
Author: sethah <sh...@cloudera.com>
Closes #17094 from sethah/ml_aggregators.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1665b5f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1665b5f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1665b5f7
Branch: refs/heads/master
Commit: 1665b5f724b486068a62c9c72dfd7ed76807c1b3
Parents: 98b5ccd
Author: sethah <se...@gmail.com>
Authored: Mon Jun 5 10:32:17 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jun 5 10:32:17 2017 +0100
----------------------------------------------------------------------
.../DifferentiableLossAggregator.scala | 88 +++++
.../aggregator/LeastSquaresAggregator.scala | 224 +++++++++++++
.../loss/DifferentiableRegularization.scala | 71 ++++
.../spark/ml/optim/loss/RDDLossFunction.scala | 72 ++++
.../spark/ml/regression/LinearRegression.scala | 327 +------------------
.../DifferentiableLossAggregatorSuite.scala | 160 +++++++++
.../LeastSquaresAggregatorSuite.scala | 157 +++++++++
.../DifferentiableRegularizationSuite.scala | 61 ++++
.../ml/optim/loss/RDDLossFunctionSuite.scala | 83 +++++
9 files changed, 930 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
new file mode 100644
index 0000000..403c28f
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregator.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
+
+/**
+ * A parent trait for aggregators used in fitting MLlib models. This parent trait implements
+ * some of the common code shared between concrete instances of aggregators. Subclasses of this
+ * aggregator need only implement the `add` method.
+ *
+ * @tparam Datum The type of the instances added to the aggregator to update the loss and gradient.
+ * @tparam Agg Specialization of [[DifferentiableLossAggregator]]. Classes that subclass this
+ * type need to use this parameter to specify the concrete type of the aggregator.
+ */
+private[ml] trait DifferentiableLossAggregator[
+ Datum,
+ Agg <: DifferentiableLossAggregator[Datum, Agg]] extends Serializable {
+
+ self: Agg => // enforce classes that extend this to be the same type as `Agg`
+
+ protected var weightSum: Double = 0.0
+ protected var lossSum: Double = 0.0
+
+ /** The dimension of the gradient array. */
+ protected val dim: Int
+
+ /** Array of gradient values that are mutated when new instances are added to the aggregator. */
+ protected lazy val gradientSumArray: Array[Double] = Array.ofDim[Double](dim)
+
+ /** Add a single data point to this aggregator. */
+ def add(instance: Datum): Agg
+
+ /** Merge two aggregators. The `this` object will be modified in place and returned. */
+ def merge(other: Agg): Agg = {
+ require(dim == other.dim, s"Dimensions mismatch when merging with another " +
+ s"${getClass.getSimpleName}. Expecting $dim but got ${other.dim}.")
+
+ if (other.weightSum != 0) {
+ weightSum += other.weightSum
+ lossSum += other.lossSum
+
+ var i = 0
+ val localThisGradientSumArray = this.gradientSumArray
+ val localOtherGradientSumArray = other.gradientSumArray
+ while (i < dim) {
+ localThisGradientSumArray(i) += localOtherGradientSumArray(i)
+ i += 1
+ }
+ }
+ this
+ }
+
+ /** The current weighted averaged gradient. */
+ def gradient: Vector = {
+ require(weightSum > 0.0, s"The effective number of instances should be " +
+ s"greater than 0.0, but was $weightSum.")
+ val result = Vectors.dense(gradientSumArray.clone())
+ BLAS.scal(1.0 / weightSum, result)
+ result
+ }
+
+ /** Weighted count of instances in this aggregator. */
+ def weight: Double = weightSum
+
+ /** The current loss value of this aggregator. */
+ def loss: Double = {
+ require(weightSum > 0.0, s"The effective number of instances should be " +
+ s"greater than 0.0, but was $weightSum.")
+ lossSum / weightSum
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala
new file mode 100644
index 0000000..1994b0e
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
+
+/**
+ * LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function,
+ * as used in linear regression for samples in sparse or dense vector in an online fashion.
+ *
+ * Two LeastSquaresAggregator can be merged together to have a summary of loss and gradient of
+ * the corresponding joint dataset.
+ *
+ * For improving the convergence rate during the optimization process, and also preventing against
+ * features with very large variances exerting an overly large influence during model training,
+ * package like R's GLMNET performs the scaling to unit variance and removing the mean to reduce
+ * the condition number, and then trains the model in scaled space but returns the coefficients in
+ * the original scale. See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf
+ *
+ * However, we don't want to apply the `StandardScaler` on the training dataset, and then cache
+ * the standardized dataset since it will create a lot of overhead. As a result, we perform the
+ * scaling implicitly when we compute the objective function. The following is the mathematical
+ * derivation.
+ *
+ * Note that we don't deal with intercept by adding bias here, because the intercept
+ * can be computed using closed form after the coefficients are converged.
+ * See this discussion for detail.
+ * http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
+ *
+ * When training with intercept enabled,
+ * The objective function in the scaled space is given by
+ *
+ * <blockquote>
+ * $$
+ * L = 1/2n ||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2,
+ * $$
+ * </blockquote>
+ *
+ * where $\bar{x_i}$ is the mean of $x_i$, $\hat{x_i}$ is the standard deviation of $x_i$,
+ * $\bar{y}$ is the mean of label, and $\hat{y}$ is the standard deviation of label.
+ *
+ * If we fitting the intercept disabled (that is forced through 0.0),
+ * we can use the same equation except we set $\bar{y}$ and $\bar{x_i}$ to 0 instead
+ * of the respective means.
+ *
+ * This can be rewritten as
+ *
+ * <blockquote>
+ * $$
+ * \begin{align}
+ * L &= 1/2n ||\sum_i (w_i/\hat{x_i})x_i - \sum_i (w_i/\hat{x_i})\bar{x_i} - y / \hat{y}
+ * + \bar{y} / \hat{y}||^2 \\
+ * &= 1/2n ||\sum_i w_i^\prime x_i - y / \hat{y} + offset||^2 = 1/2n diff^2
+ * \end{align}
+ * $$
+ * </blockquote>
+ *
+ * where $w_i^\prime$ is the effective coefficients defined by $w_i/\hat{x_i}$, offset is
+ *
+ * <blockquote>
+ * $$
+ * - \sum_i (w_i/\hat{x_i})\bar{x_i} + \bar{y} / \hat{y}.
+ * $$
+ * </blockquote>
+ *
+ * and diff is
+ *
+ * <blockquote>
+ * $$
+ * \sum_i w_i^\prime x_i - y / \hat{y} + offset
+ * $$
+ * </blockquote>
+ *
+ * Note that the effective coefficients and offset don't depend on training dataset,
+ * so they can be precomputed.
+ *
+ * Now, the first derivative of the objective function in scaled space is
+ *
+ * <blockquote>
+ * $$
+ * \frac{\partial L}{\partial w_i} = diff/N (x_i - \bar{x_i}) / \hat{x_i}
+ * $$
+ * </blockquote>
+ *
+ * However, $(x_i - \bar{x_i})$ will densify the computation, so it's not
+ * an ideal formula when the training dataset is sparse format.
+ *
+ * This can be addressed by adding the dense $\bar{x_i} / \hat{x_i}$ terms
+ * in the end by keeping the sum of diff. The first derivative of total
+ * objective function from all the samples is
+ *
+ *
+ * <blockquote>
+ * $$
+ * \begin{align}
+ * \frac{\partial L}{\partial w_i} &=
+ * 1/N \sum_j diff_j (x_{ij} - \bar{x_i}) / \hat{x_i} \\
+ * &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) - diffSum \bar{x_i} / \hat{x_i}) \\
+ * &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) + correction_i)
+ * \end{align}
+ * $$
+ * </blockquote>
+ *
+ * where $correction_i = - diffSum \bar{x_i} / \hat{x_i}$
+ *
+ * A simple math can show that diffSum is actually zero, so we don't even
+ * need to add the correction terms in the end. From the definition of diff,
+ *
+ * <blockquote>
+ * $$
+ * \begin{align}
+ * diffSum &= \sum_j (\sum_i w_i(x_{ij} - \bar{x_i})
+ * / \hat{x_i} - (y_j - \bar{y}) / \hat{y}) \\
+ * &= N * (\sum_i w_i(\bar{x_i} - \bar{x_i}) / \hat{x_i} - (\bar{y} - \bar{y}) / \hat{y}) \\
+ * &= 0
+ * \end{align}
+ * $$
+ * </blockquote>
+ *
+ * As a result, the first derivative of the total objective function only depends on
+ * the training dataset, which can be easily computed in distributed fashion, and is
+ * sparse format friendly.
+ *
+ * <blockquote>
+ * $$
+ * \frac{\partial L}{\partial w_i} = 1/N ((\sum_j diff_j x_{ij} / \hat{x_i})
+ * $$
+ * </blockquote>
+ *
+ * @note The constructor is curried, since the cost function will repeatedly create new versions
+ * of this class for different coefficient vectors.
+ *
+ * @param labelStd The standard deviation value of the label.
+ * @param labelMean The mean value of the label.
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param bcFeaturesStd The broadcast standard deviation values of the features.
+ * @param bcFeaturesMean The broadcast mean values of the features.
+ * @param bcCoefficients The broadcast coefficients corresponding to the features.
+ */
+private[ml] class LeastSquaresAggregator(
+ labelStd: Double,
+ labelMean: Double,
+ fitIntercept: Boolean,
+ bcFeaturesStd: Broadcast[Array[Double]],
+ bcFeaturesMean: Broadcast[Array[Double]])(bcCoefficients: Broadcast[Vector])
+ extends DifferentiableLossAggregator[Instance, LeastSquaresAggregator] {
+ require(labelStd > 0.0, s"${this.getClass.getName} requires the label standard " +
+ s"deviation to be positive.")
+
+ private val numFeatures = bcFeaturesStd.value.length
+ protected override val dim: Int = numFeatures
+ // make transient so we do not serialize between aggregation stages
+ @transient private lazy val featuresStd = bcFeaturesStd.value
+ @transient private lazy val effectiveCoefAndOffset = {
+ val coefficientsArray = bcCoefficients.value.toArray.clone()
+ val featuresMean = bcFeaturesMean.value
+ var sum = 0.0
+ var i = 0
+ val len = coefficientsArray.length
+ while (i < len) {
+ if (featuresStd(i) != 0.0) {
+ coefficientsArray(i) /= featuresStd(i)
+ sum += coefficientsArray(i) * featuresMean(i)
+ } else {
+ coefficientsArray(i) = 0.0
+ }
+ i += 1
+ }
+ val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
+ (Vectors.dense(coefficientsArray), offset)
+ }
+ // do not use tuple assignment above because it will circumvent the @transient tag
+ @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
+ @transient private lazy val offset = effectiveCoefAndOffset._2
+
+ /**
+ * Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient
+ * of the objective function.
+ *
+ * @param instance The instance of data point to be added.
+ * @return This LeastSquaresAggregator object.
+ */
+ def add(instance: Instance): LeastSquaresAggregator = {
+ instance match { case Instance(label, weight, features) =>
+ require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." +
+ s" Expecting $numFeatures but got ${features.size}.")
+ require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
+
+ if (weight == 0.0) return this
+
+ val diff = BLAS.dot(features, effectiveCoefficientsVector) - label / labelStd + offset
+
+ if (diff != 0) {
+ val localGradientSumArray = gradientSumArray
+ val localFeaturesStd = featuresStd
+ features.foreachActive { (index, value) =>
+ val fStd = localFeaturesStd(index)
+ if (fStd != 0.0 && value != 0.0) {
+ localGradientSumArray(index) += weight * diff * value / fStd
+ }
+ }
+ lossSum += weight * diff * diff / 2.0
+ }
+ weightSum += weight
+ this
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala
new file mode 100644
index 0000000..118c0eb
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.loss
+
+import breeze.optimize.DiffFunction
+
+/**
+ * A Breeze diff function which represents a cost function for differentiable regularization
+ * of parameters. e.g. L2 regularization: 1 / 2 regParam * beta dot beta
+ *
+ * @tparam T The type of the coefficients being regularized.
+ */
+private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] {
+
+ /** Magnitude of the regularization penalty. */
+ def regParam: Double
+
+}
+
+/**
+ * A Breeze diff function for computing the L2 regularized loss and gradient of an array of
+ * coefficients.
+ *
+ * @param regParam The magnitude of the regularization.
+ * @param shouldApply A function (Int => Boolean) indicating whether a given index should have
+ * regularization applied to it.
+ * @param featuresStd Option indicating whether the regularization should be scaled by the standard
+ * deviation of the features.
+ */
+private[ml] class L2Regularization(
+ val regParam: Double,
+ shouldApply: Int => Boolean,
+ featuresStd: Option[Array[Double]]) extends DifferentiableRegularization[Array[Double]] {
+
+ override def calculate(coefficients: Array[Double]): (Double, Array[Double]) = {
+ var sum = 0.0
+ val gradient = new Array[Double](coefficients.length)
+ coefficients.indices.filter(shouldApply).foreach { j =>
+ val coef = coefficients(j)
+ featuresStd match {
+ case Some(stds) =>
+ val std = stds(j)
+ if (std != 0.0) {
+ val temp = coef / (std * std)
+ sum += coef * temp
+ gradient(j) = regParam * temp
+ } else {
+ 0.0
+ }
+ case None =>
+ sum += coef * coef
+ gradient(j) = coef * regParam
+ }
+ }
+ (0.5 * sum * regParam, gradient)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/main/scala/org/apache/spark/ml/optim/loss/RDDLossFunction.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/RDDLossFunction.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/RDDLossFunction.scala
new file mode 100644
index 0000000..3b1618e
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/RDDLossFunction.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.loss
+
+import scala.reflect.ClassTag
+
+import breeze.linalg.{DenseVector => BDV}
+import breeze.optimize.DiffFunction
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
+import org.apache.spark.ml.optim.aggregator.DifferentiableLossAggregator
+import org.apache.spark.rdd.RDD
+
+/**
+ * This class computes the gradient and loss of a differentiable loss function by mapping a
+ * [[DifferentiableLossAggregator]] over an [[RDD]] of [[Instance]]s. The loss function is the
+ * sum of the loss computed on a single instance across all points in the RDD. Therefore, the actual
+ * analytical form of the loss function is specified by the aggregator, which computes each points
+ * contribution to the overall loss.
+ *
+ * A differentiable regularization component can also be added by providing a
+ * [[DifferentiableRegularization]] loss function.
+ *
+ * @param instances
+ * @param getAggregator A function which gets a new loss aggregator in every tree aggregate step.
+ * @param regularization An option representing the regularization loss function to apply to the
+ * coefficients.
+ * @param aggregationDepth The aggregation depth of the tree aggregation step.
+ * @tparam Agg Specialization of [[DifferentiableLossAggregator]], representing the concrete type
+ * of the aggregator.
+ */
+private[ml] class RDDLossFunction[
+ T: ClassTag,
+ Agg <: DifferentiableLossAggregator[T, Agg]: ClassTag](
+ instances: RDD[T],
+ getAggregator: (Broadcast[Vector] => Agg),
+ regularization: Option[DifferentiableRegularization[Array[Double]]],
+ aggregationDepth: Int = 2)
+ extends DiffFunction[BDV[Double]] {
+
+ override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
+ val bcCoefficients = instances.context.broadcast(Vectors.fromBreeze(coefficients))
+ val thisAgg = getAggregator(bcCoefficients)
+ val seqOp = (agg: Agg, x: T) => agg.add(x)
+ val combOp = (agg1: Agg, agg2: Agg) => agg1.merge(agg2)
+ val newAgg = instances.treeAggregate(thisAgg)(seqOp, combOp, aggregationDepth)
+ val gradient = newAgg.gradient
+ val regLoss = regularization.map { regFun =>
+ val (regLoss, regGradient) = regFun.calculate(coefficients.data)
+ BLAS.axpy(1.0, Vectors.dense(regGradient), gradient)
+ regLoss
+ }.getOrElse(0.0)
+ bcCoefficients.destroy(blocking = false)
+ (newAgg.loss + regLoss, gradient.asBreeze.toDenseVector)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index eaad549..db5ac4f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -20,19 +20,20 @@ package org.apache.spark.ml.regression
import scala.collection.mutable
import breeze.linalg.{DenseVector => BDV}
-import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN}
+import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN}
import breeze.stats.distributions.StudentsT
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.linalg.BLAS._
import org.apache.spark.ml.optim.WeightedLeastSquares
import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.optim.aggregator.LeastSquaresAggregator
+import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
@@ -319,8 +320,17 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
- val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
- $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam, $(aggregationDepth))
+ val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept),
+ bcFeaturesStd, bcFeaturesMean)(_)
+ val regularization = if (effectiveL2RegParam != 0.0) {
+ val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
+ Some(new L2Regularization(effectiveL2RegParam, shouldApply,
+ if ($(standardization)) None else Some(featuresStd)))
+ } else {
+ None
+ }
+ val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
+ $(aggregationDepth))
val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@@ -793,312 +803,3 @@ class LinearRegressionSummary private[regression] (
}
-/**
- * LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function,
- * as used in linear regression for samples in sparse or dense vector in an online fashion.
- *
- * Two LeastSquaresAggregator can be merged together to have a summary of loss and gradient of
- * the corresponding joint dataset.
- *
- * For improving the convergence rate during the optimization process, and also preventing against
- * features with very large variances exerting an overly large influence during model training,
- * package like R's GLMNET performs the scaling to unit variance and removing the mean to reduce
- * the condition number, and then trains the model in scaled space but returns the coefficients in
- * the original scale. See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf
- *
- * However, we don't want to apply the `StandardScaler` on the training dataset, and then cache
- * the standardized dataset since it will create a lot of overhead. As a result, we perform the
- * scaling implicitly when we compute the objective function. The following is the mathematical
- * derivation.
- *
- * Note that we don't deal with intercept by adding bias here, because the intercept
- * can be computed using closed form after the coefficients are converged.
- * See this discussion for detail.
- * http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
- *
- * When training with intercept enabled,
- * The objective function in the scaled space is given by
- *
- * <blockquote>
- * $$
- * L = 1/2n ||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2,
- * $$
- * </blockquote>
- *
- * where $\bar{x_i}$ is the mean of $x_i$, $\hat{x_i}$ is the standard deviation of $x_i$,
- * $\bar{y}$ is the mean of label, and $\hat{y}$ is the standard deviation of label.
- *
- * If we fitting the intercept disabled (that is forced through 0.0),
- * we can use the same equation except we set $\bar{y}$ and $\bar{x_i}$ to 0 instead
- * of the respective means.
- *
- * This can be rewritten as
- *
- * <blockquote>
- * $$
- * \begin{align}
- * L &= 1/2n ||\sum_i (w_i/\hat{x_i})x_i - \sum_i (w_i/\hat{x_i})\bar{x_i} - y / \hat{y}
- * + \bar{y} / \hat{y}||^2 \\
- * &= 1/2n ||\sum_i w_i^\prime x_i - y / \hat{y} + offset||^2 = 1/2n diff^2
- * \end{align}
- * $$
- * </blockquote>
- *
- * where $w_i^\prime$ is the effective coefficients defined by $w_i/\hat{x_i}$, offset is
- *
- * <blockquote>
- * $$
- * - \sum_i (w_i/\hat{x_i})\bar{x_i} + \bar{y} / \hat{y}.
- * $$
- * </blockquote>
- *
- * and diff is
- *
- * <blockquote>
- * $$
- * \sum_i w_i^\prime x_i - y / \hat{y} + offset
- * $$
- * </blockquote>
- *
- * Note that the effective coefficients and offset don't depend on training dataset,
- * so they can be precomputed.
- *
- * Now, the first derivative of the objective function in scaled space is
- *
- * <blockquote>
- * $$
- * \frac{\partial L}{\partial w_i} = diff/N (x_i - \bar{x_i}) / \hat{x_i}
- * $$
- * </blockquote>
- *
- * However, $(x_i - \bar{x_i})$ will densify the computation, so it's not
- * an ideal formula when the training dataset is sparse format.
- *
- * This can be addressed by adding the dense $\bar{x_i} / \hat{x_i}$ terms
- * in the end by keeping the sum of diff. The first derivative of total
- * objective function from all the samples is
- *
- *
- * <blockquote>
- * $$
- * \begin{align}
- * \frac{\partial L}{\partial w_i} &=
- * 1/N \sum_j diff_j (x_{ij} - \bar{x_i}) / \hat{x_i} \\
- * &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) - diffSum \bar{x_i} / \hat{x_i}) \\
- * &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) + correction_i)
- * \end{align}
- * $$
- * </blockquote>
- *
- * where $correction_i = - diffSum \bar{x_i} / \hat{x_i}$
- *
- * A simple math can show that diffSum is actually zero, so we don't even
- * need to add the correction terms in the end. From the definition of diff,
- *
- * <blockquote>
- * $$
- * \begin{align}
- * diffSum &= \sum_j (\sum_i w_i(x_{ij} - \bar{x_i})
- * / \hat{x_i} - (y_j - \bar{y}) / \hat{y}) \\
- * &= N * (\sum_i w_i(\bar{x_i} - \bar{x_i}) / \hat{x_i} - (\bar{y} - \bar{y}) / \hat{y}) \\
- * &= 0
- * \end{align}
- * $$
- * </blockquote>
- *
- * As a result, the first derivative of the total objective function only depends on
- * the training dataset, which can be easily computed in distributed fashion, and is
- * sparse format friendly.
- *
- * <blockquote>
- * $$
- * \frac{\partial L}{\partial w_i} = 1/N ((\sum_j diff_j x_{ij} / \hat{x_i})
- * $$
- * </blockquote>
- *
- * @param bcCoefficients The broadcast coefficients corresponding to the features.
- * @param labelStd The standard deviation value of the label.
- * @param labelMean The mean value of the label.
- * @param fitIntercept Whether to fit an intercept term.
- * @param bcFeaturesStd The broadcast standard deviation values of the features.
- * @param bcFeaturesMean The broadcast mean values of the features.
- */
-private class LeastSquaresAggregator(
- bcCoefficients: Broadcast[Vector],
- labelStd: Double,
- labelMean: Double,
- fitIntercept: Boolean,
- bcFeaturesStd: Broadcast[Array[Double]],
- bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable {
-
- private var totalCnt: Long = 0L
- private var weightSum: Double = 0.0
- private var lossSum = 0.0
-
- private val dim = bcCoefficients.value.size
- // make transient so we do not serialize between aggregation stages
- @transient private lazy val featuresStd = bcFeaturesStd.value
- @transient private lazy val effectiveCoefAndOffset = {
- val coefficientsArray = bcCoefficients.value.toArray.clone()
- val featuresMean = bcFeaturesMean.value
- var sum = 0.0
- var i = 0
- val len = coefficientsArray.length
- while (i < len) {
- if (featuresStd(i) != 0.0) {
- coefficientsArray(i) /= featuresStd(i)
- sum += coefficientsArray(i) * featuresMean(i)
- } else {
- coefficientsArray(i) = 0.0
- }
- i += 1
- }
- val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
- (Vectors.dense(coefficientsArray), offset)
- }
- // do not use tuple assignment above because it will circumvent the @transient tag
- @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
- @transient private lazy val offset = effectiveCoefAndOffset._2
-
- private lazy val gradientSumArray = Array.ofDim[Double](dim)
-
- /**
- * Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient
- * of the objective function.
- *
- * @param instance The instance of data point to be added.
- * @return This LeastSquaresAggregator object.
- */
- def add(instance: Instance): this.type = {
- instance match { case Instance(label, weight, features) =>
-
- if (weight == 0.0) return this
-
- val diff = dot(features, effectiveCoefficientsVector) - label / labelStd + offset
-
- if (diff != 0) {
- val localGradientSumArray = gradientSumArray
- val localFeaturesStd = featuresStd
- features.foreachActive { (index, value) =>
- if (localFeaturesStd(index) != 0.0 && value != 0.0) {
- localGradientSumArray(index) += weight * diff * value / localFeaturesStd(index)
- }
- }
- lossSum += weight * diff * diff / 2.0
- }
-
- totalCnt += 1
- weightSum += weight
- this
- }
- }
-
- /**
- * Merge another LeastSquaresAggregator, and update the loss and gradient
- * of the objective function.
- * (Note that it's in place merging; as a result, `this` object will be modified.)
- *
- * @param other The other LeastSquaresAggregator to be merged.
- * @return This LeastSquaresAggregator object.
- */
- def merge(other: LeastSquaresAggregator): this.type = {
-
- if (other.weightSum != 0) {
- totalCnt += other.totalCnt
- weightSum += other.weightSum
- lossSum += other.lossSum
-
- var i = 0
- val localThisGradientSumArray = this.gradientSumArray
- val localOtherGradientSumArray = other.gradientSumArray
- while (i < dim) {
- localThisGradientSumArray(i) += localOtherGradientSumArray(i)
- i += 1
- }
- }
- this
- }
-
- def count: Long = totalCnt
-
- def loss: Double = {
- require(weightSum > 0.0, s"The effective number of instances should be " +
- s"greater than 0.0, but $weightSum.")
- lossSum / weightSum
- }
-
- def gradient: Vector = {
- require(weightSum > 0.0, s"The effective number of instances should be " +
- s"greater than 0.0, but $weightSum.")
- val result = Vectors.dense(gradientSumArray.clone())
- scal(1.0 / weightSum, result)
- result
- }
-}
-
-/**
- * LeastSquaresCostFun implements Breeze's DiffFunction[T] for Least Squares cost.
- * It returns the loss and gradient with L2 regularization at a particular point (coefficients).
- * It's used in Breeze's convex optimization routines.
- */
-private class LeastSquaresCostFun(
- instances: RDD[Instance],
- labelStd: Double,
- labelMean: Double,
- fitIntercept: Boolean,
- standardization: Boolean,
- bcFeaturesStd: Broadcast[Array[Double]],
- bcFeaturesMean: Broadcast[Array[Double]],
- effectiveL2regParam: Double,
- aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
-
- override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
- val coeffs = Vectors.fromBreeze(coefficients)
- val bcCoeffs = instances.context.broadcast(coeffs)
- val localFeaturesStd = bcFeaturesStd.value
-
- val leastSquaresAggregator = {
- val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance)
- val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2)
-
- instances.treeAggregate(
- new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd,
- bcFeaturesMean))(seqOp, combOp, aggregationDepth)
- }
-
- val totalGradientArray = leastSquaresAggregator.gradient.toArray
- bcCoeffs.destroy(blocking = false)
-
- val regVal = if (effectiveL2regParam == 0.0) {
- 0.0
- } else {
- var sum = 0.0
- coeffs.foreachActive { (index, value) =>
- // The following code will compute the loss of the regularization; also
- // the gradient of the regularization, and add back to totalGradientArray.
- sum += {
- if (standardization) {
- totalGradientArray(index) += effectiveL2regParam * value
- value * value
- } else {
- if (localFeaturesStd(index) != 0.0) {
- // If `standardization` is false, we still standardize the data
- // to improve the rate of convergence; as a result, we have to
- // perform this reverse standardization by penalizing each component
- // differently to get effectively the same objective function when
- // the training dataset is not standardized.
- val temp = value / (localFeaturesStd(index) * localFeaturesStd(index))
- totalGradientArray(index) += effectiveL2regParam * temp
- value * temp
- } else {
- 0.0
- }
- }
- }
- }
- 0.5 * effectiveL2regParam * sum
- }
-
- (leastSquaresAggregator.loss + regVal, new BDV(totalGradientArray))
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
new file mode 100644
index 0000000..7a4faeb
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
+import org.apache.spark.ml.util.TestingUtils._
+
+class DifferentiableLossAggregatorSuite extends SparkFunSuite {
+
+ import DifferentiableLossAggregatorSuite.TestAggregator
+
+ private val instances1 = Array(
+ Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
+ Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
+ Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
+ )
+ private val instances2 = Seq(
+ Instance(0.2, 0.4, Vectors.dense(0.8, 2.5)),
+ Instance(0.8, 0.9, Vectors.dense(2.0, 1.3)),
+ Instance(1.5, 0.2, Vectors.dense(3.0, 0.2))
+ )
+
+ private def assertEqual[T, Agg <: DifferentiableLossAggregator[T, Agg]](
+ agg1: DifferentiableLossAggregator[T, Agg],
+ agg2: DifferentiableLossAggregator[T, Agg]): Unit = {
+ assert(agg1.weight === agg2.weight)
+ assert(agg1.loss === agg2.loss)
+ assert(agg1.gradient === agg2.gradient)
+ }
+
+ test("empty aggregator") {
+ val numFeatures = 5
+ val coef = Vectors.dense(Array.fill(numFeatures)(1.0))
+ val agg = new TestAggregator(numFeatures)(coef)
+ withClue("cannot get loss for empty aggregator") {
+ intercept[IllegalArgumentException] {
+ agg.loss
+ }
+ }
+ withClue("cannot get gradient for empty aggregator") {
+ intercept[IllegalArgumentException] {
+ agg.gradient
+ }
+ }
+ }
+
+ test("aggregator initialization") {
+ val numFeatures = 3
+ val coef = Vectors.dense(Array.fill(numFeatures)(1.0))
+ val agg = new TestAggregator(numFeatures)(coef)
+ agg.add(Instance(1.0, 0.3, Vectors.dense(Array.fill(numFeatures)(1.0))))
+ assert(agg.gradient.size === 3)
+ assert(agg.weight === 0.3)
+ }
+
+ test("merge aggregators") {
+ val coefficients = Vectors.dense(0.5, -0.1)
+ val agg1 = new TestAggregator(2)(coefficients)
+ val agg2 = new TestAggregator(2)(coefficients)
+ val aggBadDim = new TestAggregator(1)(Vectors.dense(0.5))
+ aggBadDim.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
+ instances1.foreach(agg1.add)
+
+ // merge incompatible aggregators
+ withClue("cannot merge aggregators with different dimensions") {
+ intercept[IllegalArgumentException] {
+ agg1.merge(aggBadDim)
+ }
+ }
+
+ // merge empty other
+ val mergedEmptyOther = agg1.merge(agg2)
+ assertEqual(mergedEmptyOther, agg1)
+ assert(mergedEmptyOther === agg1)
+
+ // merge empty this
+ val agg3 = new TestAggregator(2)(coefficients)
+ val mergedEmptyThis = agg3.merge(agg1)
+ assertEqual(mergedEmptyThis, agg1)
+ assert(mergedEmptyThis !== agg1)
+
+ instances2.foreach(agg2.add)
+ val (loss1, weight1, grad1) = (agg1.loss, agg1.weight, agg1.gradient)
+ val (loss2, weight2, grad2) = (agg2.loss, agg2.weight, agg2.gradient)
+ val merged = agg1.merge(agg2)
+
+ // check pointers are equal
+ assert(merged === agg1)
+
+ // loss should be weighted average of the two individual losses
+ assert(merged.loss === (loss1 * weight1 + loss2 * weight2) / (weight1 + weight2))
+ assert(merged.weight === weight1 + weight2)
+
+ // gradient should be weighted average of individual gradients
+ val addedGradients = Vectors.dense(grad1.toArray.clone())
+ BLAS.scal(weight1, addedGradients)
+ BLAS.axpy(weight2, grad2, addedGradients)
+ BLAS.scal(1 / (weight1 + weight2), addedGradients)
+ assert(merged.gradient === addedGradients)
+ }
+
+ test("loss, gradient, weight") {
+ val coefficients = Vectors.dense(0.5, -0.1)
+ val agg = new TestAggregator(2)(coefficients)
+ instances1.foreach(agg.add)
+ val errors = instances1.map { case Instance(label, _, features) =>
+ label - BLAS.dot(features, coefficients)
+ }
+ val expectedLoss = errors.zip(instances1).map { case (error: Double, instance: Instance) =>
+ instance.weight * error * error / 2.0
+ }
+ val expectedGradient = Vectors.dense(0.0, 0.0)
+ errors.zip(instances1).foreach { case (error, instance) =>
+ BLAS.axpy(instance.weight * error, instance.features, expectedGradient)
+ }
+ BLAS.scal(1.0 / agg.weight, expectedGradient)
+ val weightSum = instances1.map(_.weight).sum
+
+ assert(agg.weight ~== weightSum relTol 1e-5)
+ assert(agg.loss ~== expectedLoss.sum / weightSum relTol 1e-5)
+ assert(agg.gradient ~== expectedGradient relTol 1e-5)
+ }
+}
+
+object DifferentiableLossAggregatorSuite {
+ /**
+ * Dummy aggregator that represents least squares cost with no intercept.
+ */
+ class TestAggregator(numFeatures: Int)(coefficients: Vector)
+ extends DifferentiableLossAggregator[Instance, TestAggregator] {
+
+ protected override val dim: Int = numFeatures
+
+ override def add(instance: Instance): TestAggregator = {
+ val error = instance.label - BLAS.dot(coefficients, instance.features)
+ weightSum += instance.weight
+ lossSum += instance.weight * error * error / 2.0
+ (0 until dim).foreach { j =>
+ gradientSumArray(j) += instance.weight * error * instance.features(j)
+ }
+ this
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala
new file mode 100644
index 0000000..d1cb0d3
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+
+class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
+
+ @transient var instances: Array[Instance] = _
+ @transient var instancesConstantFeature: Array[Instance] = _
+ @transient var instancesConstantLabel: Array[Instance] = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ instances = Array(
+ Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
+ Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
+ Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
+ )
+ instancesConstantFeature = Array(
+ Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
+ Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
+ Instance(2.0, 0.3, Vectors.dense(1.0, 0.5))
+ )
+ instancesConstantLabel = Array(
+ Instance(1.0, 0.1, Vectors.dense(1.0, 2.0)),
+ Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
+ Instance(1.0, 0.3, Vectors.dense(4.0, 0.5))
+ )
+ }
+
+ /** Get feature and label summarizers for provided data. */
+ def getSummarizers(
+ instances: Array[Instance]): (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer) = {
+ val seqOp = (c: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer),
+ instance: Instance) =>
+ (c._1.add(instance.features, instance.weight),
+ c._2.add(Vectors.dense(instance.label), instance.weight))
+
+ val combOp = (c1: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer),
+ c2: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer)) =>
+ (c1._1.merge(c2._1), c1._2.merge(c2._2))
+
+ instances.aggregate(
+ new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer
+ )(seqOp, combOp)
+ }
+
+ /** Get summary statistics for some data and create a new LeastSquaresAggregator. */
+ def getNewAggregator(
+ instances: Array[Instance],
+ coefficients: Vector,
+ fitIntercept: Boolean): LeastSquaresAggregator = {
+ val (featuresSummarizer, ySummarizer) = getSummarizers(instances)
+ val yStd = math.sqrt(ySummarizer.variance(0))
+ val yMean = ySummarizer.mean(0)
+ val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+ val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
+ val featuresMean = featuresSummarizer.mean
+ val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.toArray)
+ val bcCoefficients = spark.sparkContext.broadcast(coefficients)
+ new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd,
+ bcFeaturesMean)(bcCoefficients)
+ }
+
+ test("check sizes") {
+ val coefficients = Vectors.dense(1.0, 2.0)
+ val aggIntercept = getNewAggregator(instances, coefficients, fitIntercept = true)
+ val aggNoIntercept = getNewAggregator(instances, coefficients, fitIntercept = false)
+ instances.foreach(aggIntercept.add)
+ instances.foreach(aggNoIntercept.add)
+
+ // least squares agg does not include intercept in its gradient array
+ assert(aggIntercept.gradient.size === 2)
+ assert(aggNoIntercept.gradient.size === 2)
+ }
+
+ test("check correctness") {
+ /*
+ Check that the aggregator computes loss/gradient for:
+ 0.5 * sum_i=1^N ([sum_j=1^D beta_j * ((x_j - x_j,bar) / sigma_j)] - ((y - ybar) / sigma_y))^2
+ */
+ val coefficients = Vectors.dense(1.0, 2.0)
+ val numFeatures = coefficients.size
+ val (featuresSummarizer, ySummarizer) = getSummarizers(instances)
+ val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+ val featuresMean = featuresSummarizer.mean.toArray
+ val yStd = math.sqrt(ySummarizer.variance(0))
+ val yMean = ySummarizer.mean(0)
+
+ val agg = getNewAggregator(instances, coefficients, fitIntercept = true)
+ instances.foreach(agg.add)
+
+ // compute (y - pred) analytically
+ val errors = instances.map { case Instance(l, w, f) =>
+ val scaledFeatures = (0 until numFeatures).map { j =>
+ (f.toArray(j) - featuresMean(j)) / featuresStd(j)
+ }.toArray
+ val scaledLabel = (l - yMean) / yStd
+ BLAS.dot(coefficients, Vectors.dense(scaledFeatures)) - scaledLabel
+ }
+
+ // compute expected loss sum analytically
+ val expectedLoss = errors.zip(instances).map { case (error, instance) =>
+ instance.weight * error * error / 2.0
+ }
+
+ // compute gradient analytically from instances
+ val expectedGradient = Vectors.dense(0.0, 0.0)
+ errors.zip(instances).foreach { case (error, instance) =>
+ val scaledFeatures = (0 until numFeatures).map { j =>
+ instance.weight * instance.features.toArray(j) / featuresStd(j)
+ }.toArray
+ BLAS.axpy(error, Vectors.dense(scaledFeatures), expectedGradient)
+ }
+
+ val weightSum = instances.map(_.weight).sum
+ BLAS.scal(1.0 / weightSum, expectedGradient)
+ assert(agg.loss ~== (expectedLoss.sum / weightSum) relTol 1e-5)
+ assert(agg.gradient ~== expectedGradient relTol 1e-5)
+ }
+
+ test("check with zero standard deviation") {
+ val coefficients = Vectors.dense(1.0, 2.0)
+ val aggConstantFeature = getNewAggregator(instancesConstantFeature, coefficients,
+ fitIntercept = true)
+ instances.foreach(aggConstantFeature.add)
+ // constant features should not affect gradient
+ assert(aggConstantFeature.gradient(0) === 0.0)
+
+ withClue("LeastSquaresAggregator does not support zero standard deviation of the label") {
+ intercept[IllegalArgumentException] {
+ getNewAggregator(instancesConstantLabel, coefficients, fitIntercept = true)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala
new file mode 100644
index 0000000..0794417
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.loss
+
+import org.apache.spark.SparkFunSuite
+
+class DifferentiableRegularizationSuite extends SparkFunSuite {
+
+ test("L2 regularization") {
+ val shouldApply = (_: Int) => true
+ val regParam = 0.3
+ val coefficients = Array(1.0, 3.0, -2.0)
+ val numFeatures = coefficients.size
+
+ // check without features standard
+ val regFun = new L2Regularization(regParam, shouldApply, None)
+ val (loss, grad) = regFun.calculate(coefficients)
+ assert(loss === 0.5 * regParam * coefficients.map(x => x * x).sum)
+ assert(grad === coefficients.map(_ * regParam))
+
+ // check with features standard
+ val featuresStd = Array(0.1, 1.1, 0.5)
+ val regFunStd = new L2Regularization(regParam, shouldApply, Some(featuresStd))
+ val (lossStd, gradStd) = regFunStd.calculate(coefficients)
+ val expectedLossStd = 0.5 * regParam * (0 until numFeatures).map { j =>
+ coefficients(j) * coefficients(j) / (featuresStd(j) * featuresStd(j))
+ }.sum
+ val expectedGradientStd = (0 until numFeatures).map { j =>
+ regParam * coefficients(j) / (featuresStd(j) * featuresStd(j))
+ }.toArray
+ assert(lossStd === expectedLossStd)
+ assert(gradStd === expectedGradientStd)
+
+ // check should apply
+ val shouldApply2 = (i: Int) => i == 1
+ val regFunApply = new L2Regularization(regParam, shouldApply2, None)
+ val (lossApply, gradApply) = regFunApply.calculate(coefficients)
+ assert(lossApply === 0.5 * regParam * coefficients(1) * coefficients(1))
+ assert(gradApply === Array(0.0, coefficients(1) * regParam, 0.0))
+
+ // check with zero features standard
+ val featuresStdZero = Array(0.1, 0.0, 0.5)
+ val regFunStdZero = new L2Regularization(regParam, shouldApply, Some(featuresStdZero))
+ val (_, gradStdZero) = regFunStdZero.calculate(coefficients)
+ assert(gradStdZero(1) == 0.0)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1665b5f7/mllib/src/test/scala/org/apache/spark/ml/optim/loss/RDDLossFunctionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/loss/RDDLossFunctionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/RDDLossFunctionSuite.scala
new file mode 100644
index 0000000..cd5cebe
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/RDDLossFunctionSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.loss
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
+import org.apache.spark.ml.optim.aggregator.DifferentiableLossAggregatorSuite.TestAggregator
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.rdd.RDD
+
+class RDDLossFunctionSuite extends SparkFunSuite with MLlibTestSparkContext {
+
+ @transient var instances: RDD[Instance] = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ instances = sc.parallelize(Seq(
+ Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
+ Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
+ Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
+ ))
+ }
+
+ test("regularization") {
+ val coefficients = Vectors.dense(0.5, -0.1)
+ val regLossFun = new L2Regularization(0.1, (_: Int) => true, None)
+ val getAgg = (bvec: Broadcast[Vector]) => new TestAggregator(2)(bvec.value)
+ val lossNoReg = new RDDLossFunction(instances, getAgg, None)
+ val lossWithReg = new RDDLossFunction(instances, getAgg, Some(regLossFun))
+
+ val (loss1, grad1) = lossNoReg.calculate(coefficients.asBreeze.toDenseVector)
+ val (regLoss, regGrad) = regLossFun.calculate(coefficients.toArray)
+ val (loss2, grad2) = lossWithReg.calculate(coefficients.asBreeze.toDenseVector)
+
+ BLAS.axpy(1.0, Vectors.fromBreeze(grad1), Vectors.dense(regGrad))
+ assert(Vectors.dense(regGrad) ~== Vectors.fromBreeze(grad2) relTol 1e-5)
+ assert(loss1 + regLoss === loss2)
+ }
+
+ test("empty RDD") {
+ val rdd = sc.parallelize(Seq.empty[Instance])
+ val coefficients = Vectors.dense(0.5, -0.1)
+ val getAgg = (bv: Broadcast[Vector]) => new TestAggregator(2)(bv.value)
+ val lossFun = new RDDLossFunction(rdd, getAgg, None)
+ withClue("cannot calculate cost for empty dataset") {
+ intercept[IllegalArgumentException]{
+ lossFun.calculate(coefficients.asBreeze.toDenseVector)
+ }
+ }
+ }
+
+ test("versus aggregating on an iterable") {
+ val coefficients = Vectors.dense(0.5, -0.1)
+ val getAgg = (bv: Broadcast[Vector]) => new TestAggregator(2)(bv.value)
+ val lossFun = new RDDLossFunction(instances, getAgg, None)
+ val (loss, grad) = lossFun.calculate(coefficients.asBreeze.toDenseVector)
+
+ // just map the aggregator over the instances array
+ val agg = new TestAggregator(2)(coefficients)
+ instances.collect().foreach(agg.add)
+
+ assert(loss === agg.loss)
+ assert(Vectors.fromBreeze(grad) === agg.gradient)
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org