You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2016/11/20 01:44:59 UTC

spark git commit: [SPARK-18456][ML][FOLLOWUP] Use matrix abstraction for coefficients in LogisticRegression training

Repository: spark
Updated Branches:
  refs/heads/master ea77c81ec -> 856e00420


[SPARK-18456][ML][FOLLOWUP] Use matrix abstraction for coefficients in LogisticRegression training

## What changes were proposed in this pull request?

This is a follow up to some of the discussion [here](https://github.com/apache/spark/pull/15593). During LogisticRegression training, we store the coefficients combined with intercepts as a flat vector, but a more natural abstraction is a matrix. Here, we refactor the code to use matrix where possible, which makes the code more readable and greatly simplifies the indexing.

Note: We do not use a Breeze matrix for the cost function as was mentioned in the linked PR. This is because LBFGS/OWLQN require an implicit `MutableInnerProductModule[DenseMatrix[Double], Double]` which is not natively defined in Breeze. We would need to extend Breeze in Spark to define it ourselves. Also, we do not modify the `regParamL1Fun` because OWLQN in Breeze requires a `MutableEnumeratedCoordinateField[(Int, Int), DenseVector[Double]]` (since we still use a dense vector for coefficients). Here again we would have to extend Breeze inside Spark.

## How was this patch tested?

This is internal code refactoring - the current unit tests passing show us that the change did not break anything. No added functionality in this patch.

Author: sethah <se...@gmail.com>

Closes #15893 from sethah/logreg_refactor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/856e0042
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/856e0042
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/856e0042

Branch: refs/heads/master
Commit: 856e0042007c789dda4539fb19a5d4580999fbf4
Parents: ea77c81
Author: sethah <se...@gmail.com>
Authored: Sun Nov 20 01:42:37 2016 +0000
Committer: DB Tsai <db...@dbtsai.com>
Committed: Sun Nov 20 01:42:37 2016 +0000

----------------------------------------------------------------------
 .../ml/classification/LogisticRegression.scala  | 115 +++++++++----------
 1 file changed, 53 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/856e0042/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 71a7fe5..f58efd3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -463,16 +463,11 @@ class LogisticRegression @Since("1.2.0") (
         }
 
         /*
-          The coefficients are laid out in column major order during training. e.g. for
-          `numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is:
-
-           Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2,
-             intercept_3)
-
-           where beta_jk corresponds to the coefficient for class `j` and feature `k`.
+          The coefficients are laid out in column major order during training. Here we initialize
+          a column major matrix of initial coefficients.
          */
-        val initialCoefficientsWithIntercept =
-          Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept)
+        val initialCoefWithInterceptMatrix =
+          Matrices.zeros(numCoefficientSets, numFeaturesPlusIntercept)
 
         val initialModelIsValid = optInitialModel match {
           case Some(_initialModel) =>
@@ -491,18 +486,15 @@ class LogisticRegression @Since("1.2.0") (
         }
 
         if (initialModelIsValid) {
-          val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray
           val providedCoef = optInitialModel.get.coefficientMatrix
-          providedCoef.foreachActive { (row, col, value) =>
-            // convert matrix to column major for training
-            val flatIndex = col * numCoefficientSets + row
+          providedCoef.foreachActive { (classIndex, featureIndex, value) =>
             // We need to scale the coefficients since they will be trained in the scaled space
-            initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col)
+            initialCoefWithInterceptMatrix.update(classIndex, featureIndex,
+              value * featuresStd(featureIndex))
           }
           if ($(fitIntercept)) {
-            optInitialModel.get.interceptVector.foreachActive { (index, value) =>
-              val coefIndex = numCoefficientSets * numFeatures + index
-              initialCoefWithInterceptArray(coefIndex) = value
+            optInitialModel.get.interceptVector.foreachActive { (classIndex, value) =>
+              initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value)
             }
           }
         } else if ($(fitIntercept) && isMultinomial) {
@@ -532,8 +524,7 @@ class LogisticRegression @Since("1.2.0") (
           val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
           val rawMean = rawIntercepts.sum / rawIntercepts.length
           rawIntercepts.indices.foreach { i =>
-            initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) =
-              rawIntercepts(i) - rawMean
+            initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean)
           }
         } else if ($(fitIntercept)) {
           /*
@@ -549,12 +540,12 @@ class LogisticRegression @Since("1.2.0") (
                b = \log{P(1) / P(0)} = \log{count_1 / count_0}
              }}}
            */
-          initialCoefficientsWithIntercept.toArray(numFeatures) = math.log(
-            histogram(1) / histogram(0))
+          initialCoefWithInterceptMatrix.update(0, numFeatures,
+            math.log(histogram(1) / histogram(0)))
         }
 
         val states = optimizer.iterations(new CachedDiffFunction(costFun),
-          initialCoefficientsWithIntercept.asBreeze.toDenseVector)
+          new BDV[Double](initialCoefWithInterceptMatrix.toArray))
 
         /*
            Note that in Logistic Regression, the objective history (loss + regularization)
@@ -586,15 +577,24 @@ class LogisticRegression @Since("1.2.0") (
            Note that the intercept in scaled space and original space is the same;
            as a result, no scaling is needed.
          */
-        val rawCoefficients = state.x.toArray.clone()
-        val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i =>
-          val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures
-          val featureIndex = i % numFeatures
-          if (featuresStd(featureIndex) != 0.0) {
-            rawCoefficients(colMajorIndex) / featuresStd(featureIndex)
-          } else {
-            0.0
+        val allCoefficients = state.x.toArray.clone()
+        val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept,
+          allCoefficients)
+        val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures,
+          new Array[Double](numCoefficientSets * numFeatures), isTransposed = true)
+        val interceptVec = if ($(fitIntercept) || !isMultinomial) {
+          Vectors.zeros(numCoefficientSets)
+        } else {
+          Vectors.sparse(numCoefficientSets, Seq())
+        }
+        // separate intercepts and coefficients from the combined matrix
+        allCoefMatrix.foreachActive { (classIndex, featureIndex, value) =>
+          val isIntercept = $(fitIntercept) && (featureIndex == numFeatures)
+          if (!isIntercept && featuresStd(featureIndex) != 0.0) {
+            denseCoefficientMatrix.update(classIndex, featureIndex,
+              value / featuresStd(featureIndex))
           }
+          if (isIntercept) interceptVec.toArray(classIndex) = value
         }
 
         if ($(regParam) == 0.0 && isMultinomial) {
@@ -607,17 +607,16 @@ class LogisticRegression @Since("1.2.0") (
             Friedman, et al. "Regularization Paths for Generalized Linear Models via
               Coordinate Descent," https://core.ac.uk/download/files/153/6287975.pdf
            */
-          val coefficientMean = coefficientArray.sum / coefficientArray.length
-          coefficientArray.indices.foreach { i => coefficientArray(i) -= coefficientMean}
+          val denseValues = denseCoefficientMatrix.values
+          val coefficientMean = denseValues.sum / denseValues.length
+          denseCoefficientMatrix.update(_ - coefficientMean)
         }
 
-        val denseCoefficientMatrix =
-          new DenseMatrix(numCoefficientSets, numFeatures, coefficientArray, isTransposed = true)
         // TODO: use `denseCoefficientMatrix.compressed` after SPARK-17471
         val compressedCoefficientMatrix = if (isMultinomial) {
           denseCoefficientMatrix
         } else {
-          val compressedVector = Vectors.dense(coefficientArray).compressed
+          val compressedVector = Vectors.dense(denseCoefficientMatrix.values).compressed
           compressedVector match {
             case dv: DenseVector => denseCoefficientMatrix
             case sv: SparseVector =>
@@ -626,25 +625,13 @@ class LogisticRegression @Since("1.2.0") (
           }
         }
 
-        val interceptsArray: Array[Double] = if ($(fitIntercept)) {
-          Array.tabulate(numCoefficientSets) { i =>
-            val coefIndex = numFeatures * numCoefficientSets + i
-            rawCoefficients(coefIndex)
-          }
-        } else {
-          Array.empty[Double]
-        }
-        val interceptVector = if (interceptsArray.nonEmpty && isMultinomial) {
-          // The intercepts are never regularized, so we always center the mean.
-          val interceptMean = interceptsArray.sum / numClasses
-          interceptsArray.indices.foreach { i => interceptsArray(i) -= interceptMean }
-          Vectors.dense(interceptsArray)
-        } else if (interceptsArray.length == 1) {
-          Vectors.dense(interceptsArray)
-        } else {
-          Vectors.sparse(numCoefficientSets, Seq())
+        // center the intercepts when using multinomial algorithm
+        if ($(fitIntercept) && isMultinomial) {
+          val interceptArray = interceptVec.toArray
+          val interceptMean = interceptArray.sum / interceptArray.length
+          (0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean }
         }
-        (compressedCoefficientMatrix, interceptVector.compressed, arrayBuilder.result())
+        (compressedCoefficientMatrix, interceptVec.compressed, arrayBuilder.result())
       }
     }
 
@@ -1424,6 +1411,7 @@ private class LogisticAggregator(
   private val numFeatures = bcFeaturesStd.value.length
   private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
   private val coefficientSize = bcCoefficients.value.size
+  private val numCoefficientSets = if (multinomial) numClasses else 1
   if (multinomial) {
     require(numClasses ==  coefficientSize / numFeaturesPlusIntercept, s"The number of " +
       s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize")
@@ -1633,12 +1621,12 @@ private class LogisticAggregator(
     lossSum / weightSum
   }
 
-  def gradient: Vector = {
+  def gradient: Matrix = {
     require(weightSum > 0.0, s"The effective number of instances should be " +
       s"greater than 0.0, but $weightSum.")
     val result = Vectors.dense(gradientSumArray.clone())
     scal(1.0 / weightSum, result)
-    result
+    new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray)
   }
 }
 
@@ -1664,6 +1652,7 @@ private class LogisticCostFun(
     val featuresStd = bcFeaturesStd.value
     val numFeatures = featuresStd.length
     val numCoefficientSets = if (multinomial) numClasses else 1
+    val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
 
     val logisticAggregator = {
       val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
@@ -1675,24 +1664,25 @@ private class LogisticCostFun(
       )(seqOp, combOp, aggregationDepth)
     }
 
-    val totalGradientArray = logisticAggregator.gradient.toArray
+    val totalGradientMatrix = logisticAggregator.gradient
+    val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray)
     // regVal is the sum of coefficients squares excluding intercept for L2 regularization.
     val regVal = if (regParamL2 == 0.0) {
       0.0
     } else {
       var sum = 0.0
-      coeffs.foreachActive { case (index, value) =>
+      coefMatrix.foreachActive { case (classIndex, featureIndex, value) =>
         // We do not apply regularization to the intercepts
-        val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures
+        val isIntercept = fitIntercept && (featureIndex == numFeatures)
         if (!isIntercept) {
           // The following code will compute the loss of the regularization; also
           // the gradient of the regularization, and add back to totalGradientArray.
           sum += {
             if (standardization) {
-              totalGradientArray(index) += regParamL2 * value
+              val gradValue = totalGradientMatrix(classIndex, featureIndex)
+              totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value)
               value * value
             } else {
-              val featureIndex = index / numCoefficientSets
               if (featuresStd(featureIndex) != 0.0) {
                 // If `standardization` is false, we still standardize the data
                 // to improve the rate of convergence; as a result, we have to
@@ -1700,7 +1690,8 @@ private class LogisticCostFun(
                 // differently to get effectively the same objective function when
                 // the training dataset is not standardized.
                 val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex))
-                totalGradientArray(index) += regParamL2 * temp
+                val gradValue = totalGradientMatrix(classIndex, featureIndex)
+                totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp)
                 value * temp
               } else {
                 0.0
@@ -1713,6 +1704,6 @@ private class LogisticCostFun(
     }
     bcCoeffs.destroy(blocking = false)
 
-    (logisticAggregator.loss + regVal, new BDV(totalGradientArray))
+    (logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org