You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yl...@apache.org on 2017/08/25 02:22:33 UTC
spark git commit: [SPARK-21108][ML] convert LinearSVC to aggregator
framework
Repository: spark
Updated Branches:
refs/heads/master 05af2de0f -> f3676d639
[SPARK-21108][ML] convert LinearSVC to aggregator framework
## What changes were proposed in this pull request?
convert LinearSVC to new aggregator framework
## How was this patch tested?
existing unit test.
Author: Yuhao Yang <yu...@intel.com>
Closes #18315 from hhbyyh/svcAggregator.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3676d63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3676d63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3676d63
Branch: refs/heads/master
Commit: f3676d63913e0706e071b71e1742b8d57b102fba
Parents: 05af2de
Author: Yuhao Yang <yu...@intel.com>
Authored: Fri Aug 25 10:22:27 2017 +0800
Committer: Yanbo Liang <yb...@gmail.com>
Committed: Fri Aug 25 10:22:27 2017 +0800
----------------------------------------------------------------------
.../spark/ml/classification/LinearSVC.scala | 204 ++-----------------
.../ml/optim/aggregator/HingeAggregator.scala | 105 ++++++++++
.../ml/classification/LinearSVCSuite.scala | 7 +-
.../optim/aggregator/HingeAggregatorSuite.scala | 163 +++++++++++++++
.../aggregator/LogisticAggregatorSuite.scala | 2 -
5 files changed, 286 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index 8d556de..3b0666c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -25,11 +25,11 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
-import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.optim.aggregator.HingeAggregator
+import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
@@ -214,10 +214,20 @@ class LinearSVC @Since("2.2.0") (
}
val featuresStd = summarizer.variance.toArray.map(math.sqrt)
+ val getFeaturesStd = (j: Int) => featuresStd(j)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
- val costFun = new LinearSVCCostFun(instances, $(fitIntercept),
- $(standardization), bcFeaturesStd, regParamL2, $(aggregationDepth))
+ val regularization = if (regParamL2 != 0.0) {
+ val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
+ Some(new L2Regularization(regParamL2, shouldApply,
+ if ($(standardization)) None else Some(getFeaturesStd)))
+ } else {
+ None
+ }
+
+ val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
+ val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
+ $(aggregationDepth))
def regParamL1Fun = (index: Int) => 0D
val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
@@ -372,189 +382,3 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] {
}
}
}
-
-/**
- * LinearSVCCostFun implements Breeze's DiffFunction[T] for hinge loss function
- */
-private class LinearSVCCostFun(
- instances: RDD[Instance],
- fitIntercept: Boolean,
- standardization: Boolean,
- bcFeaturesStd: Broadcast[Array[Double]],
- regParamL2: Double,
- aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
-
- override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
- val coeffs = Vectors.fromBreeze(coefficients)
- val bcCoeffs = instances.context.broadcast(coeffs)
- val featuresStd = bcFeaturesStd.value
- val numFeatures = featuresStd.length
-
- val svmAggregator = {
- val seqOp = (c: LinearSVCAggregator, instance: Instance) => c.add(instance)
- val combOp = (c1: LinearSVCAggregator, c2: LinearSVCAggregator) => c1.merge(c2)
-
- instances.treeAggregate(
- new LinearSVCAggregator(bcCoeffs, bcFeaturesStd, fitIntercept)
- )(seqOp, combOp, aggregationDepth)
- }
-
- val totalGradientArray = svmAggregator.gradient.toArray
- // regVal is the sum of coefficients squares excluding intercept for L2 regularization.
- val regVal = if (regParamL2 == 0.0) {
- 0.0
- } else {
- var sum = 0.0
- coeffs.foreachActive { case (index, value) =>
- // We do not apply regularization to the intercepts
- if (index != numFeatures) {
- // The following code will compute the loss of the regularization; also
- // the gradient of the regularization, and add back to totalGradientArray.
- sum += {
- if (standardization) {
- totalGradientArray(index) += regParamL2 * value
- value * value
- } else {
- if (featuresStd(index) != 0.0) {
- // If `standardization` is false, we still standardize the data
- // to improve the rate of convergence; as a result, we have to
- // perform this reverse standardization by penalizing each component
- // differently to get effectively the same objective function when
- // the training dataset is not standardized.
- val temp = value / (featuresStd(index) * featuresStd(index))
- totalGradientArray(index) += regParamL2 * temp
- value * temp
- } else {
- 0.0
- }
- }
- }
- }
- }
- 0.5 * regParamL2 * sum
- }
- bcCoeffs.destroy(blocking = false)
-
- (svmAggregator.loss + regVal, new BDV(totalGradientArray))
- }
-}
-
-/**
- * LinearSVCAggregator computes the gradient and loss for hinge loss function, as used
- * in binary classification for instances in sparse or dense vector in an online fashion.
- *
- * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of
- * the corresponding joint dataset.
- *
- * This class standardizes feature values during computation using bcFeaturesStd.
- *
- * @param bcCoefficients The coefficients corresponding to the features.
- * @param fitIntercept Whether to fit an intercept term.
- * @param bcFeaturesStd The standard deviation values of the features.
- */
-private class LinearSVCAggregator(
- bcCoefficients: Broadcast[Vector],
- bcFeaturesStd: Broadcast[Array[Double]],
- fitIntercept: Boolean) extends Serializable {
-
- private val numFeatures: Int = bcFeaturesStd.value.length
- private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
- private var weightSum: Double = 0.0
- private var lossSum: Double = 0.0
- @transient private lazy val coefficientsArray = bcCoefficients.value match {
- case DenseVector(values) => values
- case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
- s" but got type ${bcCoefficients.value.getClass}.")
- }
- private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept)
-
- /**
- * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient
- * of the objective function.
- *
- * @param instance The instance of data point to be added.
- * @return This LinearSVCAggregator object.
- */
- def add(instance: Instance): this.type = {
- instance match { case Instance(label, weight, features) =>
-
- if (weight == 0.0) return this
- val localFeaturesStd = bcFeaturesStd.value
- val localCoefficients = coefficientsArray
- val localGradientSumArray = gradientSumArray
-
- val dotProduct = {
- var sum = 0.0
- features.foreachActive { (index, value) =>
- if (localFeaturesStd(index) != 0.0 && value != 0.0) {
- sum += localCoefficients(index) * value / localFeaturesStd(index)
- }
- }
- if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
- sum
- }
- // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
- // Therefore the gradient is -(2y - 1)*x
- val labelScaled = 2 * label - 1.0
- val loss = if (1.0 > labelScaled * dotProduct) {
- weight * (1.0 - labelScaled * dotProduct)
- } else {
- 0.0
- }
-
- if (1.0 > labelScaled * dotProduct) {
- val gradientScale = -labelScaled * weight
- features.foreachActive { (index, value) =>
- if (localFeaturesStd(index) != 0.0 && value != 0.0) {
- localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
- }
- }
- if (fitIntercept) {
- localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
- }
- }
-
- lossSum += loss
- weightSum += weight
- this
- }
- }
-
- /**
- * Merge another LinearSVCAggregator, and update the loss and gradient
- * of the objective function.
- * (Note that it's in place merging; as a result, `this` object will be modified.)
- *
- * @param other The other LinearSVCAggregator to be merged.
- * @return This LinearSVCAggregator object.
- */
- def merge(other: LinearSVCAggregator): this.type = {
-
- if (other.weightSum != 0.0) {
- weightSum += other.weightSum
- lossSum += other.lossSum
-
- var i = 0
- val localThisGradientSumArray = this.gradientSumArray
- val localOtherGradientSumArray = other.gradientSumArray
- val len = localThisGradientSumArray.length
- while (i < len) {
- localThisGradientSumArray(i) += localOtherGradientSumArray(i)
- i += 1
- }
- }
- this
- }
-
- def loss: Double = if (weightSum != 0) lossSum / weightSum else 0.0
-
- def gradient: Vector = {
- if (weightSum != 0) {
- val result = Vectors.dense(gradientSumArray.clone())
- scal(1.0 / weightSum, result)
- result
- } else {
- Vectors.dense(new Array[Double](numFeaturesPlusIntercept))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
new file mode 100644
index 0000000..0300500
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg._
+
+/**
+ * HingeAggregator computes the gradient and loss for Hinge loss function as used in
+ * binary classification for instances in sparse or dense vector in an online fashion.
+ *
+ * Two HingeAggregators can be merged together to have a summary of loss and gradient of
+ * the corresponding joint dataset.
+ *
+ * This class standardizes feature values during computation using bcFeaturesStd.
+ *
+ * @param bcCoefficients The coefficients corresponding to the features.
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param bcFeaturesStd The standard deviation values of the features.
+ */
+private[ml] class HingeAggregator(
+ bcFeaturesStd: Broadcast[Array[Double]],
+ fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
+ extends DifferentiableLossAggregator[Instance, HingeAggregator] {
+
+ private val numFeatures: Int = bcFeaturesStd.value.length
+ private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
+ @transient private lazy val coefficientsArray = bcCoefficients.value match {
+ case DenseVector(values) => values
+ case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
+ s" but got type ${bcCoefficients.value.getClass}.")
+ }
+ protected override val dim: Int = numFeaturesPlusIntercept
+
+ /**
+ * Add a new training instance to this HingeAggregator, and update the loss and gradient
+ * of the objective function.
+ *
+ * @param instance The instance of data point to be added.
+ * @return This HingeAggregator object.
+ */
+ def add(instance: Instance): this.type = {
+ instance match { case Instance(label, weight, features) =>
+ require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." +
+ s" Expecting $numFeatures but got ${features.size}.")
+ require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
+
+ if (weight == 0.0) return this
+ val localFeaturesStd = bcFeaturesStd.value
+ val localCoefficients = coefficientsArray
+ val localGradientSumArray = gradientSumArray
+
+ val dotProduct = {
+ var sum = 0.0
+ features.foreachActive { (index, value) =>
+ if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+ sum += localCoefficients(index) * value / localFeaturesStd(index)
+ }
+ }
+ if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
+ sum
+ }
+ // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
+ // Therefore the gradient is -(2y - 1)*x
+ val labelScaled = 2 * label - 1.0
+ val loss = if (1.0 > labelScaled * dotProduct) {
+ (1.0 - labelScaled * dotProduct) * weight
+ } else {
+ 0.0
+ }
+
+ if (1.0 > labelScaled * dotProduct) {
+ val gradientScale = -labelScaled * weight
+ features.foreachActive { (index, value) =>
+ if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+ localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
+ }
+ }
+ if (fitIntercept) {
+ localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
+ }
+ }
+
+ lossSum += loss
+ weightSum += weight
+ this
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
index f2b00d0..41a5d22 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
@@ -25,7 +25,8 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.classification.LinearSVCSuite._
import org.apache.spark.ml.feature.{Instance, LabeledPoint}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
-import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
+import org.apache.spark.ml.optim.aggregator.HingeAggregator
+import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -170,10 +171,10 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
assert(model2.intercept !== 0.0)
}
- test("sparse coefficients in SVCAggregator") {
+ test("sparse coefficients in HingeAggregator") {
val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
- val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true)
+ val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients)
val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") {
intercept[IllegalArgumentException] {
agg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
new file mode 100644
index 0000000..61b48ff
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+
+class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
+
+ import DifferentiableLossAggregatorSuite.getClassificationSummarizers
+
+ @transient var instances: Array[Instance] = _
+ @transient var instancesConstantFeature: Array[Instance] = _
+ @transient var instancesConstantFeatureFiltered: Array[Instance] = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ instances = Array(
+ Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
+ Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
+ Instance(0.0, 0.3, Vectors.dense(4.0, 0.5))
+ )
+ instancesConstantFeature = Array(
+ Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
+ Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
+ Instance(1.0, 0.3, Vectors.dense(1.0, 0.5))
+ )
+ instancesConstantFeatureFiltered = Array(
+ Instance(0.0, 0.1, Vectors.dense(2.0)),
+ Instance(1.0, 0.5, Vectors.dense(1.0)),
+ Instance(2.0, 0.3, Vectors.dense(0.5))
+ )
+ }
+
+ /** Get summary statistics for some data and create a new HingeAggregator. */
+ private def getNewAggregator(
+ instances: Array[Instance],
+ coefficients: Vector,
+ fitIntercept: Boolean): HingeAggregator = {
+ val (featuresSummarizer, ySummarizer) =
+ DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
+ val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+ val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
+ val bcCoefficients = spark.sparkContext.broadcast(coefficients)
+ new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
+ }
+
+ test("aggregator add method input size") {
+ val coefArray = Array(1.0, 2.0)
+ val interceptArray = Array(2.0)
+ val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
+ fitIntercept = true)
+ withClue("HingeAggregator features dimension must match coefficients dimension") {
+ intercept[IllegalArgumentException] {
+ agg.add(Instance(1.0, 1.0, Vectors.dense(2.0)))
+ }
+ }
+ }
+
+ test("negative weight") {
+ val coefArray = Array(1.0, 2.0)
+ val interceptArray = Array(2.0)
+ val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
+ fitIntercept = true)
+ withClue("HingeAggregator does not support negative instance weights") {
+ intercept[IllegalArgumentException] {
+ agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0)))
+ }
+ }
+ }
+
+ test("check sizes") {
+ val rng = new scala.util.Random
+ val numFeatures = instances.head.features.size
+ val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble))
+ val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble))
+ val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true)
+ val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept,
+ fitIntercept = false)
+ instances.foreach(aggIntercept.add)
+ instances.foreach(aggNoIntercept.add)
+
+ assert(aggIntercept.gradient.size === numFeatures + 1)
+ assert(aggNoIntercept.gradient.size === numFeatures)
+ }
+
+ test("check correctness") {
+ val coefArray = Array(1.0, 2.0)
+ val intercept = 1.0
+ val numFeatures = instances.head.features.size
+ val (featuresSummarizer, _) = getClassificationSummarizers(instances)
+ val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+ val weightSum = instances.map(_.weight).sum
+
+ val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
+ fitIntercept = true)
+ instances.foreach(agg.add)
+
+ // compute the loss
+ val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i)).toArray
+ val lossSum = instances.map { case Instance(l, w, f) =>
+ val margin = BLAS.dot(Vectors.dense(stdCoef), f) + intercept
+ val labelScaled = 2 * l - 1.0
+ if (1.0 > labelScaled * margin) {
+ (1.0 - labelScaled * margin) * w
+ } else {
+ 0.0
+ }
+ }.sum
+ val loss = lossSum / weightSum
+
+ // compute the gradients
+ val gradientCoef = new Array[Double](numFeatures)
+ var gradientIntercept = 0.0
+ instances.foreach { case Instance(l, w, f) =>
+ val margin = BLAS.dot(f, Vectors.dense(coefArray)) + intercept
+ if (1.0 > (2 * l - 1.0) * margin) {
+ gradientCoef.indices.foreach { i =>
+ gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i)
+ }
+ gradientIntercept += -(2 * l - 1.0) * w
+ }
+ }
+ val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum))
+
+ assert(loss ~== agg.loss relTol 0.01)
+ assert(gradient ~== agg.gradient relTol 0.01)
+ }
+
+ test("check with zero standard deviation") {
+ val binaryCoefArray = Array(1.0, 2.0)
+ val intercept = 1.0
+ val aggConstantFeatureBinary = getNewAggregator(instancesConstantFeature,
+ Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true)
+ instancesConstantFeature.foreach(aggConstantFeatureBinary.add)
+
+ val aggConstantFeatureBinaryFiltered = getNewAggregator(instancesConstantFeatureFiltered,
+ Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true)
+ instancesConstantFeatureFiltered.foreach(aggConstantFeatureBinaryFiltered.add)
+
+ // constant features should not affect gradient
+ assert(aggConstantFeatureBinary.gradient(0) === 0.0)
+ assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
index 16ef4af..4c7913d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
@@ -217,8 +217,6 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
}.sum
val loss = lossSum / weightSum
-
-
// compute the gradients
val gradientCoef = new Array[Double](numFeatures)
var gradientIntercept = 0.0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org