You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2017/02/28 00:35:09 UTC

spark git commit: [SPARK-19746][ML] Faster indexing for logistic aggregator

Repository: spark
Updated Branches:
  refs/heads/master 8a5a58506 -> 16d8472f7


[SPARK-19746][ML] Faster indexing for logistic aggregator

## What changes were proposed in this pull request?

JIRA: [SPARK-19746](https://issues.apache.org/jira/browse/SPARK-19746)

The following code is inefficient:

````scala
    val localCoefficients: Vector = bcCoefficients.value

    features.foreachActive { (index, value) =>
      val stdValue = value / localFeaturesStd(index)
      var j = 0
      while (j < numClasses) {
        margins(j) += localCoefficients(index * numClasses + j) * stdValue
        j += 1
      }
    }
````

`localCoefficients(index * numClasses + j)` calls `Vector.apply` which creates a new Breeze vector and indexes that. Even if it is not that slow to create the object, we will generate a lot of extra garbage that may result in longer GC pauses. This is a hot inner loop, so we should optimize wherever possible.

## How was this patch tested?

I don't think there's a great way to test this patch. It's purely performance related, so unit tests should guarantee that we haven't made any unwanted changes. Empirically I observed between 10-40% speedups just running short local tests. I suspect the big differences will be seen when large data/coefficient sizes have to pause for GC more often. I welcome other ideas for testing.

Author: sethah <se...@gmail.com>

Closes #17078 from sethah/logistic_agg_indexing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16d8472f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16d8472f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16d8472f

Branch: refs/heads/master
Commit: 16d8472f74313b0d1932b69a9807fe0f2ad7057c
Parents: 8a5a585
Author: sethah <se...@gmail.com>
Authored: Tue Feb 28 00:34:38 2017 +0000
Committer: DB Tsai <db...@dbtsai.com>
Committed: Tue Feb 28 00:34:38 2017 +0000

----------------------------------------------------------------------
 .../ml/classification/LogisticRegression.scala  | 11 ++++++---
 .../LogisticRegressionSuite.scala               | 26 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16d8472f/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 892e00f..738b351 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -1431,7 +1431,12 @@ private class LogisticAggregator(
   private var weightSum = 0.0
   private var lossSum = 0.0
 
-  private val gradientSumArray = Array.fill[Double](coefficientSize)(0.0D)
+  @transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match {
+    case DenseVector(values) => values
+    case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " +
+      s"got type ${bcCoefficients.value.getClass}.)")
+  }
+  private val gradientSumArray = new Array[Double](coefficientSize)
 
   if (multinomial && numClasses <= 2) {
     logInfo(s"Multinomial logistic regression for binary classification yields separate " +
@@ -1447,7 +1452,7 @@ private class LogisticAggregator(
       label: Double): Unit = {
 
     val localFeaturesStd = bcFeaturesStd.value
-    val localCoefficients = bcCoefficients.value
+    val localCoefficients = coefficientsArray
     val localGradientArray = gradientSumArray
     val margin = - {
       var sum = 0.0
@@ -1491,7 +1496,7 @@ private class LogisticAggregator(
       logistic regression without pivoting.
      */
     val localFeaturesStd = bcFeaturesStd.value
-    val localCoefficients = bcCoefficients.value
+    val localCoefficients = coefficientsArray
     val localGradientArray = gradientSumArray
 
     // marginOfLabel is margins(label) in the formula

http://git-wip-us.apache.org/repos/asf/spark/blob/16d8472f/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 43547a4..d89a958 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -456,6 +456,32 @@ class LogisticRegressionSuite
     assert(blrModel.intercept !== 0.0)
   }
 
+  test("sparse coefficients in LogisticAggregator") {
+    val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
+    val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
+    val binaryAgg = new LogisticAggregator(bcCoefficientsBinary, bcFeaturesStd, 2,
+      fitIntercept = true, multinomial = false)
+    val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") {
+      intercept[IllegalArgumentException] {
+        binaryAgg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
+      }
+    }
+    assert(thrownBinary.getMessage.contains("coefficients only supports dense"))
+
+    val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0)))
+    val multinomialAgg = new LogisticAggregator(bcCoefficientsMulti, bcFeaturesStd, 3,
+      fitIntercept = true, multinomial = true)
+    val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") {
+      intercept[IllegalArgumentException] {
+        multinomialAgg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
+      }
+    }
+    assert(thrown.getMessage.contains("coefficients only supports dense"))
+    bcCoefficientsBinary.destroy(blocking = false)
+    bcFeaturesStd.destroy(blocking = false)
+    bcCoefficientsMulti.destroy(blocking = false)
+  }
+
   test("overflow prediction for multiclass") {
     val model = new LogisticRegressionModel("mLogReg",
       Matrices.dense(3, 2, Array(0.0, 0.0, 0.0, 1.0, 2.0, 3.0)),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org