You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/12/13 21:31:02 UTC

spark git commit: [SPARK-18471][MLLIB] In LBFGS, avoid sending huge vectors of 0

Repository: spark
Updated Branches:
  refs/heads/master e57e3938c -> 9e8a9d7c6


[SPARK-18471][MLLIB] In LBFGS, avoid sending huge vectors of 0

## What changes were proposed in this pull request?

CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we replace treeAggregate by
mapPartition + treeReduce, creating a zero vector inside the mapPartition
block in-place.

## How was this patch tested?

Unit test for module mllib run locally for correctness.

As for performance we run an heavy optimization on our production data (50 iterations on 128 MB weight vectors) and have seen significant decrease in terms both of runtime and container being killed by lack of off-heap memory.

Author: Anthony Truchet <a....@criteo.com>
Author: sethah <se...@gmail.com>
Author: Anthony Truchet <An...@users.noreply.github.com>

Closes #16037 from AnthonyTruchet/ENG-17719-lbfgs-only.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e8a9d7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e8a9d7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e8a9d7c

Branch: refs/heads/master
Commit: 9e8a9d7c6a847bc5e77f9a1004029ec27616da9d
Parents: e57e393
Author: Anthony Truchet <a....@criteo.com>
Authored: Tue Dec 13 21:30:57 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Dec 13 21:30:57 2016 +0000

----------------------------------------------------------------------
 .../apache/spark/mllib/optimization/LBFGS.scala | 28 +++++++++++++-------
 .../spark/mllib/optimization/LBFGSSuite.scala   | 19 +++++++++++++
 2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e8a9d7c/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
index e0e41f7..7a714db 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
@@ -241,16 +241,24 @@ object LBFGS extends Logging {
       val bcW = data.context.broadcast(w)
       val localGradient = gradient
 
-      val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
-          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
-            val l = localGradient.compute(
-              features, label, bcW.value, grad)
-            (grad, loss + l)
-          },
-          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
-            axpy(1.0, grad2, grad1)
-            (grad1, loss1 + loss2)
-          })
+      val seqOp = (c: (Vector, Double), v: (Double, Vector)) =>
+        (c, v) match {
+          case ((grad, loss), (label, features)) =>
+            val denseGrad = grad.toDense
+            val l = localGradient.compute(features, label, bcW.value, denseGrad)
+            (denseGrad, loss + l)
+        }
+
+      val combOp = (c1: (Vector, Double), c2: (Vector, Double)) =>
+        (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
+          val denseGrad1 = grad1.toDense
+          val denseGrad2 = grad2.toDense
+          axpy(1.0, denseGrad2, denseGrad1)
+          (denseGrad1, loss1 + loss2)
+       }
+
+      val zeroSparseVector = Vectors.sparse(n, Seq())
+      val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp)
 
       // broadcasted model is not needed anymore
       bcW.destroy()

http://git-wip-us.apache.org/repos/asf/spark/blob/9e8a9d7c/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
index 75ae0eb..5729592 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
@@ -230,6 +230,25 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers
       (weightLBFGS(0) ~= weightGD(0) relTol 0.02) && (weightLBFGS(1) ~= weightGD(1) relTol 0.02),
       "The weight differences between LBFGS and GD should be within 2%.")
   }
+
+  test("SPARK-18471: LBFGS aggregator on empty partitions") {
+    val regParam = 0
+
+    val initialWeightsWithIntercept = Vectors.dense(0.0)
+    val convergenceTol = 1e-12
+    val numIterations = 1
+    val dataWithEmptyPartitions = sc.parallelize(Seq((1.0, Vectors.dense(2.0))), 2)
+
+    LBFGS.runLBFGS(
+      dataWithEmptyPartitions,
+      gradient,
+      simpleUpdater,
+      numCorrections,
+      convergenceTol,
+      numIterations,
+      regParam,
+      initialWeightsWithIntercept)
+  }
 }
 
 class LBFGSClusterSuite extends SparkFunSuite with LocalClusterSparkContext {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org