You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/02 13:26:46 UTC

[GitHub] [spark] ykerzhner commented on a change in pull request #31693: [SPARK-34448][ML][WIP] Binary logistic regression incorrectly computes the intercept and coefficients when data is not centered

ykerzhner commented on a change in pull request #31693:
URL: https://github.com/apache/spark/pull/31693#discussion_r585560669



##########
File path: mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
##########
@@ -86,6 +87,21 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
       df
     }
 
+    binaryDatasetWithSmallVar = {
+      val nPoints = 10000
+      val coefficients = Array(-0.57997, 0.912083, -0.371077, -0.819866, 2.688191)
+      val xMean = Array(5.843, 3.057, 3.758, 1.199)
+      val xVariance = Array(0.6856, 0.1899, 3.116, 0.0001)

Review comment:
       We should be able to mathematically compute the intercept, and then see which solver is closer to it.  What are the log(odds) of the target vector for this test case?

##########
File path: mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala
##########
@@ -606,3 +606,149 @@ private[ml] class BlockLogisticAggregator(
     }
   }
 }
+
+
+/**
+ * BlockBinaryLogisticAggregator computes the gradient and loss used in Logistic classification
+ * for blocks in sparse or dense matrix in an online fashion.
+ *
+ * Two BlockLogisticAggregators can be merged together to have a summary of loss and gradient of
+ * the corresponding joint dataset.
+ *
+ * NOTE: The feature values are expected to already have be scaled (divided by [[bcFeaturesStd]],
+ * NOT centered) before computation.
+ *
+ * @param bcCoefficients The coefficients corresponding to the features.
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param fitWithMean Whether to center the data with mean before training. If true, we MUST adjust
+ *                    the intercept of both initial coefficients and final solution in the caller.
+ */
+private[ml] class BlockBinaryLogisticAggregator(
+    bcFeaturesStd: Broadcast[Array[Double]],
+    bcFeaturesMean: Broadcast[Array[Double]],
+    fitIntercept: Boolean,
+    fitWithMean: Boolean)(bcCoefficients: Broadcast[Vector])
+  extends DifferentiableLossAggregator[InstanceBlock, BlockBinaryLogisticAggregator] with Logging {
+
+  if (fitWithMean) {
+    require(fitIntercept, s"for training without intercept, should not center the vectors")
+  }
+
+  private val numFeatures = bcFeaturesStd.value.length
+  protected override val dim: Int = bcCoefficients.value.size
+
+  @transient private lazy val coefficientsArray = bcCoefficients.value match {
+    case DenseVector(values) => values
+    case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " +
+      s"got type ${bcCoefficients.value.getClass}.)")
+  }
+
+  @transient private lazy val linear = if (fitIntercept) {
+    new DenseVector(coefficientsArray.take(numFeatures))
+  } else {
+    new DenseVector(coefficientsArray)
+  }
+
+  @transient private lazy val scaledMean = if (fitWithMean) {

Review comment:
       👍 

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
##########
@@ -934,31 +934,56 @@ class LogisticRegression @Since("1.2.0") (
       instances: RDD[Instance],
       actualBlockSizeInMB: Double,
       featuresStd: Array[Double],
+      featuresMean: Array[Double],
       numClasses: Int,
-      initialCoefWithInterceptMatrix: Matrix,
+      initialCoefWithInterceptArray: Array[Double],
       regularization: Option[L2Regularization],
       optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]]) = {
     val numFeatures = featuresStd.length
     val bcFeaturesStd = instances.context.broadcast(featuresStd)
+    val bcFeaturesMean = instances.context.broadcast(featuresMean)
 
-    val standardized = instances.mapPartitions { iter =>
+    val scaled = instances.mapPartitions { iter =>
       val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 }
       val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
       iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
     }
 
     val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong
-    val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage)
+    val blocks = InstanceBlock.blokifyWithMaxMemUsage(scaled, maxMemUsage)
       .persist(StorageLevel.MEMORY_AND_DISK)
       .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")
 
-    val getAggregatorFunc = new BlockLogisticAggregator(numFeatures, numClasses, $(fitIntercept),
-      checkMultinomial(numClasses))(_)
+    val multinomial = checkMultinomial(numClasses)
+    val fitWithMean = !multinomial && $(fitIntercept) &&
+      (!isSet(lowerBoundsOnIntercepts) || $(lowerBoundsOnIntercepts)(0).isNegInfinity) &&
+      (!isSet(upperBoundsOnIntercepts) || $(upperBoundsOnIntercepts)(0).isPosInfinity)
+
+    val costFun = if (multinomial) {
+      // TODO: create a separate BlockMultinomialLogisticAggregator for clearness
+      val getAggregatorFunc = new BlockLogisticAggregator(numFeatures, numClasses,
+         $(fitIntercept), true)(_)
+      new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth))
+    } else {
+       val getAggregatorFunc = new BlockBinaryLogisticAggregator(bcFeaturesStd, bcFeaturesMean,
+         $(fitIntercept), fitWithMean)(_)
+      new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth))
+    }
+
+    if (fitWithMean) {
+      var i = 0
+      var adapt = 0.0
+      while (i < numFeatures) {
+        if (featuresStd(i) != 0) {
+          adapt += initialCoefWithInterceptArray(i) * featuresMean(i) / featuresStd(i)

Review comment:
       @srowen If you consider the linear transformation x -> (x - avg(x)) / std(x), then consider sum_i c_i * x_i + B, where B is the intercept.  The transformation of this term under the linear transformation above becomes sum_i (c_i * x_i / std(x_i) - c_i * avg(x_i) / std(x_i)) + B = sum_i c_i  * x_i / std(x_i) + (B - sum_i c_i  avg(x_i) / std(x_i)).  So the intercept changes by -sum_i c_i * avg(x_i) / std(x_i).  This is what is done in lines 1004-1113 below.  The code here attempts to do the opposite of that here before the optimization starts.  I actually don't think this is necessary at all.  For the initial point of the intercept, we actually want it to be the log(odds) of the data after the centering.  So we do not want to modify the intercept here.  In general, this entire operation would be a no-op as all the coefficients are zero at the start.  The case where this would actually change the intercept would be in case there are bounds given for the coefficients.  But again, 
 the intercept in the centered/scaled space should be precisely the log(odds) of the target vector, and so this attempt to change the intercept is actually counter productive.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org