You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yl...@apache.org on 2017/08/25 02:22:33 UTC

spark git commit: [SPARK-21108][ML] convert LinearSVC to aggregator framework

Repository: spark
Updated Branches:
  refs/heads/master 05af2de0f -> f3676d639


[SPARK-21108][ML] convert LinearSVC to aggregator framework

## What changes were proposed in this pull request?

convert LinearSVC to new aggregator framework

## How was this patch tested?

existing unit test.

Author: Yuhao Yang <yu...@intel.com>

Closes #18315 from hhbyyh/svcAggregator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3676d63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3676d63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3676d63

Branch: refs/heads/master
Commit: f3676d63913e0706e071b71e1742b8d57b102fba
Parents: 05af2de
Author: Yuhao Yang <yu...@intel.com>
Authored: Fri Aug 25 10:22:27 2017 +0800
Committer: Yanbo Liang <yb...@gmail.com>
Committed: Fri Aug 25 10:22:27 2017 +0800

----------------------------------------------------------------------
 .../spark/ml/classification/LinearSVC.scala     | 204 ++-----------------
 .../ml/optim/aggregator/HingeAggregator.scala   | 105 ++++++++++
 .../ml/classification/LinearSVCSuite.scala      |   7 +-
 .../optim/aggregator/HingeAggregatorSuite.scala | 163 +++++++++++++++
 .../aggregator/LogisticAggregatorSuite.scala    |   2 -
 5 files changed, 286 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index 8d556de..3b0666c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -25,11 +25,11 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Experimental, Since}
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg._
-import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.optim.aggregator.HingeAggregator
+import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
@@ -214,10 +214,20 @@ class LinearSVC @Since("2.2.0") (
       }
 
       val featuresStd = summarizer.variance.toArray.map(math.sqrt)
+      val getFeaturesStd = (j: Int) => featuresStd(j)
       val regParamL2 = $(regParam)
       val bcFeaturesStd = instances.context.broadcast(featuresStd)
-      val costFun = new LinearSVCCostFun(instances, $(fitIntercept),
-        $(standardization), bcFeaturesStd, regParamL2, $(aggregationDepth))
+      val regularization = if (regParamL2 != 0.0) {
+        val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
+        Some(new L2Regularization(regParamL2, shouldApply,
+          if ($(standardization)) None else Some(getFeaturesStd)))
+      } else {
+        None
+      }
+
+      val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
+      val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
+        $(aggregationDepth))
 
       def regParamL1Fun = (index: Int) => 0D
       val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
@@ -372,189 +382,3 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] {
     }
   }
 }
-
-/**
- * LinearSVCCostFun implements Breeze's DiffFunction[T] for hinge loss function
- */
-private class LinearSVCCostFun(
-    instances: RDD[Instance],
-    fitIntercept: Boolean,
-    standardization: Boolean,
-    bcFeaturesStd: Broadcast[Array[Double]],
-    regParamL2: Double,
-    aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
-
-  override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
-    val coeffs = Vectors.fromBreeze(coefficients)
-    val bcCoeffs = instances.context.broadcast(coeffs)
-    val featuresStd = bcFeaturesStd.value
-    val numFeatures = featuresStd.length
-
-    val svmAggregator = {
-      val seqOp = (c: LinearSVCAggregator, instance: Instance) => c.add(instance)
-      val combOp = (c1: LinearSVCAggregator, c2: LinearSVCAggregator) => c1.merge(c2)
-
-      instances.treeAggregate(
-        new LinearSVCAggregator(bcCoeffs, bcFeaturesStd, fitIntercept)
-      )(seqOp, combOp, aggregationDepth)
-    }
-
-    val totalGradientArray = svmAggregator.gradient.toArray
-    // regVal is the sum of coefficients squares excluding intercept for L2 regularization.
-    val regVal = if (regParamL2 == 0.0) {
-      0.0
-    } else {
-      var sum = 0.0
-      coeffs.foreachActive { case (index, value) =>
-        // We do not apply regularization to the intercepts
-        if (index != numFeatures) {
-          // The following code will compute the loss of the regularization; also
-          // the gradient of the regularization, and add back to totalGradientArray.
-          sum += {
-            if (standardization) {
-              totalGradientArray(index) += regParamL2 * value
-              value * value
-            } else {
-              if (featuresStd(index) != 0.0) {
-                // If `standardization` is false, we still standardize the data
-                // to improve the rate of convergence; as a result, we have to
-                // perform this reverse standardization by penalizing each component
-                // differently to get effectively the same objective function when
-                // the training dataset is not standardized.
-                val temp = value / (featuresStd(index) * featuresStd(index))
-                totalGradientArray(index) += regParamL2 * temp
-                value * temp
-              } else {
-                0.0
-              }
-            }
-          }
-        }
-      }
-      0.5 * regParamL2 * sum
-    }
-    bcCoeffs.destroy(blocking = false)
-
-    (svmAggregator.loss + regVal, new BDV(totalGradientArray))
-  }
-}
-
-/**
- * LinearSVCAggregator computes the gradient and loss for hinge loss function, as used
- * in binary classification for instances in sparse or dense vector in an online fashion.
- *
- * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of
- * the corresponding joint dataset.
- *
- * This class standardizes feature values during computation using bcFeaturesStd.
- *
- * @param bcCoefficients The coefficients corresponding to the features.
- * @param fitIntercept Whether to fit an intercept term.
- * @param bcFeaturesStd The standard deviation values of the features.
- */
-private class LinearSVCAggregator(
-    bcCoefficients: Broadcast[Vector],
-    bcFeaturesStd: Broadcast[Array[Double]],
-    fitIntercept: Boolean) extends Serializable {
-
-  private val numFeatures: Int = bcFeaturesStd.value.length
-  private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
-  private var weightSum: Double = 0.0
-  private var lossSum: Double = 0.0
-  @transient private lazy val coefficientsArray = bcCoefficients.value match {
-    case DenseVector(values) => values
-    case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
-      s" but got type ${bcCoefficients.value.getClass}.")
-  }
-  private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept)
-
-  /**
-   * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient
-   * of the objective function.
-   *
-   * @param instance The instance of data point to be added.
-   * @return This LinearSVCAggregator object.
-   */
-  def add(instance: Instance): this.type = {
-    instance match { case Instance(label, weight, features) =>
-
-      if (weight == 0.0) return this
-      val localFeaturesStd = bcFeaturesStd.value
-      val localCoefficients = coefficientsArray
-      val localGradientSumArray = gradientSumArray
-
-      val dotProduct = {
-        var sum = 0.0
-        features.foreachActive { (index, value) =>
-          if (localFeaturesStd(index) != 0.0 && value != 0.0) {
-            sum += localCoefficients(index) * value / localFeaturesStd(index)
-          }
-        }
-        if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
-        sum
-      }
-      // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
-      // Therefore the gradient is -(2y - 1)*x
-      val labelScaled = 2 * label - 1.0
-      val loss = if (1.0 > labelScaled * dotProduct) {
-        weight * (1.0 - labelScaled * dotProduct)
-      } else {
-        0.0
-      }
-
-      if (1.0 > labelScaled * dotProduct) {
-        val gradientScale = -labelScaled * weight
-        features.foreachActive { (index, value) =>
-          if (localFeaturesStd(index) != 0.0 && value != 0.0) {
-            localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
-          }
-        }
-        if (fitIntercept) {
-          localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
-        }
-      }
-
-      lossSum += loss
-      weightSum += weight
-      this
-    }
-  }
-
-  /**
-   * Merge another LinearSVCAggregator, and update the loss and gradient
-   * of the objective function.
-   * (Note that it's in place merging; as a result, `this` object will be modified.)
-   *
-   * @param other The other LinearSVCAggregator to be merged.
-   * @return This LinearSVCAggregator object.
-   */
-  def merge(other: LinearSVCAggregator): this.type = {
-
-    if (other.weightSum != 0.0) {
-      weightSum += other.weightSum
-      lossSum += other.lossSum
-
-      var i = 0
-      val localThisGradientSumArray = this.gradientSumArray
-      val localOtherGradientSumArray = other.gradientSumArray
-      val len = localThisGradientSumArray.length
-      while (i < len) {
-        localThisGradientSumArray(i) += localOtherGradientSumArray(i)
-        i += 1
-      }
-    }
-    this
-  }
-
-  def loss: Double = if (weightSum != 0) lossSum / weightSum else 0.0
-
-  def gradient: Vector = {
-    if (weightSum != 0) {
-      val result = Vectors.dense(gradientSumArray.clone())
-      scal(1.0 / weightSum, result)
-      result
-    } else {
-      Vectors.dense(new Array[Double](numFeaturesPlusIntercept))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
new file mode 100644
index 0000000..0300500
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg._
+
+/**
+ * HingeAggregator computes the gradient and loss for Hinge loss function as used in
+ * binary classification for instances in sparse or dense vector in an online fashion.
+ *
+ * Two HingeAggregators can be merged together to have a summary of loss and gradient of
+ * the corresponding joint dataset.
+ *
+ * This class standardizes feature values during computation using bcFeaturesStd.
+ *
+ * @param bcCoefficients The coefficients corresponding to the features.
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param bcFeaturesStd The standard deviation values of the features.
+ */
+private[ml] class HingeAggregator(
+    bcFeaturesStd: Broadcast[Array[Double]],
+    fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
+  extends DifferentiableLossAggregator[Instance, HingeAggregator] {
+
+  private val numFeatures: Int = bcFeaturesStd.value.length
+  private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
+  @transient private lazy val coefficientsArray = bcCoefficients.value match {
+    case DenseVector(values) => values
+    case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
+      s" but got type ${bcCoefficients.value.getClass}.")
+  }
+  protected override val dim: Int = numFeaturesPlusIntercept
+
+  /**
+   * Add a new training instance to this HingeAggregator, and update the loss and gradient
+   * of the objective function.
+   *
+   * @param instance The instance of data point to be added.
+   * @return This HingeAggregator object.
+   */
+  def add(instance: Instance): this.type = {
+    instance match { case Instance(label, weight, features) =>
+      require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." +
+        s" Expecting $numFeatures but got ${features.size}.")
+      require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
+
+      if (weight == 0.0) return this
+      val localFeaturesStd = bcFeaturesStd.value
+      val localCoefficients = coefficientsArray
+      val localGradientSumArray = gradientSumArray
+
+      val dotProduct = {
+        var sum = 0.0
+        features.foreachActive { (index, value) =>
+          if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+            sum += localCoefficients(index) * value / localFeaturesStd(index)
+          }
+        }
+        if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
+        sum
+      }
+      // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
+      // Therefore the gradient is -(2y - 1)*x
+      val labelScaled = 2 * label - 1.0
+      val loss = if (1.0 > labelScaled * dotProduct) {
+        (1.0 - labelScaled * dotProduct) * weight
+      } else {
+        0.0
+      }
+
+      if (1.0 > labelScaled * dotProduct) {
+        val gradientScale = -labelScaled * weight
+        features.foreachActive { (index, value) =>
+          if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+            localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
+          }
+        }
+        if (fitIntercept) {
+          localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
+        }
+      }
+
+      lossSum += loss
+      weightSum += weight
+      this
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
index f2b00d0..41a5d22 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
@@ -25,7 +25,8 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.classification.LinearSVCSuite._
 import org.apache.spark.ml.feature.{Instance, LabeledPoint}
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
-import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
+import org.apache.spark.ml.optim.aggregator.HingeAggregator
+import org.apache.spark.ml.param.ParamsSuite
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -170,10 +171,10 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
     assert(model2.intercept !== 0.0)
   }
 
-  test("sparse coefficients in SVCAggregator") {
+  test("sparse coefficients in HingeAggregator") {
     val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
     val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
-    val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true)
+    val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients)
     val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") {
       intercept[IllegalArgumentException] {
         agg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))

http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
new file mode 100644
index 0000000..61b48ff
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+
+class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
+
+  import DifferentiableLossAggregatorSuite.getClassificationSummarizers
+
+  @transient var instances: Array[Instance] = _
+  @transient var instancesConstantFeature: Array[Instance] = _
+  @transient var instancesConstantFeatureFiltered: Array[Instance] = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    instances = Array(
+      Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
+      Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
+      Instance(0.0, 0.3, Vectors.dense(4.0, 0.5))
+    )
+    instancesConstantFeature = Array(
+      Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
+      Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
+      Instance(1.0, 0.3, Vectors.dense(1.0, 0.5))
+    )
+    instancesConstantFeatureFiltered = Array(
+      Instance(0.0, 0.1, Vectors.dense(2.0)),
+      Instance(1.0, 0.5, Vectors.dense(1.0)),
+      Instance(2.0, 0.3, Vectors.dense(0.5))
+    )
+  }
+
+   /** Get summary statistics for some data and create a new HingeAggregator. */
+  private def getNewAggregator(
+      instances: Array[Instance],
+      coefficients: Vector,
+      fitIntercept: Boolean): HingeAggregator = {
+    val (featuresSummarizer, ySummarizer) =
+      DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
+    val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+    val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
+    val bcCoefficients = spark.sparkContext.broadcast(coefficients)
+    new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
+  }
+
+  test("aggregator add method input size") {
+    val coefArray = Array(1.0, 2.0)
+    val interceptArray = Array(2.0)
+    val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
+      fitIntercept = true)
+    withClue("HingeAggregator features dimension must match coefficients dimension") {
+      intercept[IllegalArgumentException] {
+        agg.add(Instance(1.0, 1.0, Vectors.dense(2.0)))
+      }
+    }
+  }
+
+  test("negative weight") {
+    val coefArray = Array(1.0, 2.0)
+    val interceptArray = Array(2.0)
+    val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray),
+      fitIntercept = true)
+    withClue("HingeAggregator does not support negative instance weights") {
+      intercept[IllegalArgumentException] {
+        agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0)))
+      }
+    }
+  }
+
+  test("check sizes") {
+    val rng = new scala.util.Random
+    val numFeatures = instances.head.features.size
+    val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble))
+    val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble))
+    val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true)
+    val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept,
+      fitIntercept = false)
+    instances.foreach(aggIntercept.add)
+    instances.foreach(aggNoIntercept.add)
+
+    assert(aggIntercept.gradient.size === numFeatures + 1)
+    assert(aggNoIntercept.gradient.size === numFeatures)
+  }
+
+  test("check correctness") {
+    val coefArray = Array(1.0, 2.0)
+    val intercept = 1.0
+    val numFeatures = instances.head.features.size
+    val (featuresSummarizer, _) = getClassificationSummarizers(instances)
+    val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+    val weightSum = instances.map(_.weight).sum
+
+    val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
+      fitIntercept = true)
+    instances.foreach(agg.add)
+
+    // compute the loss
+    val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i)).toArray
+    val lossSum = instances.map { case Instance(l, w, f) =>
+      val margin = BLAS.dot(Vectors.dense(stdCoef), f) + intercept
+      val labelScaled = 2 * l - 1.0
+      if (1.0 > labelScaled * margin) {
+        (1.0 - labelScaled * margin) * w
+      } else {
+        0.0
+      }
+    }.sum
+    val loss = lossSum / weightSum
+
+    // compute the gradients
+    val gradientCoef = new Array[Double](numFeatures)
+    var gradientIntercept = 0.0
+    instances.foreach { case Instance(l, w, f) =>
+      val margin = BLAS.dot(f, Vectors.dense(coefArray)) + intercept
+      if (1.0 > (2 * l - 1.0) * margin) {
+        gradientCoef.indices.foreach { i =>
+          gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i)
+        }
+        gradientIntercept += -(2 * l - 1.0) * w
+      }
+    }
+    val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum))
+
+    assert(loss ~== agg.loss relTol 0.01)
+    assert(gradient ~== agg.gradient relTol 0.01)
+  }
+
+  test("check with zero standard deviation") {
+    val binaryCoefArray = Array(1.0, 2.0)
+    val intercept = 1.0
+    val aggConstantFeatureBinary = getNewAggregator(instancesConstantFeature,
+      Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true)
+    instancesConstantFeature.foreach(aggConstantFeatureBinary.add)
+
+    val aggConstantFeatureBinaryFiltered = getNewAggregator(instancesConstantFeatureFiltered,
+      Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true)
+    instancesConstantFeatureFiltered.foreach(aggConstantFeatureBinaryFiltered.add)
+
+    // constant features should not affect gradient
+    assert(aggConstantFeatureBinary.gradient(0) === 0.0)
+    assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
index 16ef4af..4c7913d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
@@ -217,8 +217,6 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
     }.sum
     val loss = lossSum / weightSum
 
-
-
     // compute the gradients
     val gradientCoef = new Array[Double](numFeatures)
     var gradientIntercept = 0.0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org