You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/06/02 20:48:27 UTC

git commit: [SPARK-1553] Alternating nonnegative least-squares

Repository: spark
Updated Branches:
  refs/heads/master 9535f4045 -> 9a5d482e0


[SPARK-1553] Alternating nonnegative least-squares

This pull request includes a nonnegative least-squares solver (NNLS) tailored to the kinds of small-scale problems that come up when training matrix factorisation models by alternating nonnegative least-squares (ANNLS).

The method used for the NNLS subproblems is based on the classical method of projected gradients.  There is a modification where, if the set of active constraints has not changed since the last iteration, a conjugate gradient step is considered and possibly rejected in favour of the gradient; this improves convergence once the optimal face has been located.

The NNLS solver is in `org.apache.spark.mllib.optimization.NNLSbyPCG`.

Author: Tor Myklebust <tm...@gmail.com>

Closes #460 from tmyklebu/annls and squashes the following commits:

79bc4b5 [Tor Myklebust] Merge branch 'master' of https://github.com/apache/spark into annls
199b0bc [Tor Myklebust] Make the ctor private again and use the builder pattern.
7fbabf1 [Tor Myklebust] Cleanup matrix math in NNLSSuite.
65ef7f2 [Tor Myklebust] Make ALS's ctor public and remove a couple of "convenience" wrappers.
2d4f3cb [Tor Myklebust] Cleanup.
0cb4481 [Tor Myklebust] Drop the iteration limit from 40k to max(400,20n).
e2a01d1 [Tor Myklebust] Create a workspace object for NNLS to cut down on memory allocations.
b285106 [Tor Myklebust] Clean up NNLS test cases.
9c820b6 [Tor Myklebust] Tweak variable names.
8a1a436 [Tor Myklebust] Describe the problem and add a reference to Polyak's paper.
5345402 [Tor Myklebust] Style fixes that got eaten.
ac673bd [Tor Myklebust] More safeguards against numerical ridiculousness.
c288b6a [Tor Myklebust] Finish moving the NNLS solver.
9a82fa6 [Tor Myklebust] Fix scalastyle moanings.
33bf4f2 [Tor Myklebust] Fix missing space.
89ea0a8 [Tor Myklebust] Hack ALSSuite to support NNLS testing.
f5dbf4d [Tor Myklebust] Teach ALS how to use the NNLS solver.
6cb563c [Tor Myklebust] Tests for the nonnegative least squares solver.
a68ac10 [Tor Myklebust] A nonnegative least-squares solver.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a5d482e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a5d482e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a5d482e

Branch: refs/heads/master
Commit: 9a5d482e090eaaea8491d3864667e0f513e7195c
Parents: 9535f40
Author: Tor Myklebust <tm...@gmail.com>
Authored: Mon Jun 2 11:48:09 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Mon Jun 2 11:48:09 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/mllib/optimization/NNLS.scala  | 169 +++++++++++++++++++
 .../apache/spark/mllib/recommendation/ALS.scala |  33 +++-
 .../spark/mllib/optimization/NNLSSuite.scala    |  80 +++++++++
 .../spark/mllib/recommendation/ALSSuite.scala   |  32 ++--
 4 files changed, 300 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9a5d482e/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
new file mode 100644
index 0000000..e4b436b
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.optimization
+
+import org.jblas.{DoubleMatrix, SimpleBlas}
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * Object used to solve nonnegative least squares problems using a modified
+ * projected gradient method.
+ */
+private[mllib] object NNLS {
+  class Workspace(val n: Int) {
+    val scratch = new DoubleMatrix(n, 1)
+    val grad = new DoubleMatrix(n, 1)
+    val x = new DoubleMatrix(n, 1)
+    val dir = new DoubleMatrix(n, 1)
+    val lastDir = new DoubleMatrix(n, 1)
+    val res = new DoubleMatrix(n, 1)
+
+    def wipe() {
+      scratch.fill(0.0)
+      grad.fill(0.0)
+      x.fill(0.0)
+      dir.fill(0.0)
+      lastDir.fill(0.0)
+      res.fill(0.0)
+    }
+  }
+
+  def createWorkspace(n: Int): Workspace = {
+    new Workspace(n)
+  }
+
+  /**
+   * Solve a least squares problem, possibly with nonnegativity constraints, by a modified
+   * projected gradient method.  That is, find x minimising ||Ax - b||_2 given A^T A and A^T b.
+   *
+   * We solve the problem
+   *   min_x      1/2 x^T ata x^T - x^T atb
+   *   subject to x >= 0
+   *
+   * The method used is similar to one described by Polyak (B. T. Polyak, The conjugate gradient
+   * method in extremal problems, Zh. Vychisl. Mat. Mat. Fiz. 9(4)(1969), pp. 94-112) for bound-
+   * constrained nonlinear programming.  Polyak unconditionally uses a conjugate gradient
+   * direction, however, while this method only uses a conjugate gradient direction if the last
+   * iteration did not cause a previously-inactive constraint to become active.
+   */
+  def solve(ata: DoubleMatrix, atb: DoubleMatrix, ws: Workspace): Array[Double] = {
+    ws.wipe()
+
+    val n = atb.rows
+    val scratch = ws.scratch
+
+    // find the optimal unconstrained step
+    def steplen(dir: DoubleMatrix, res: DoubleMatrix): Double = {
+      val top = SimpleBlas.dot(dir, res)
+      SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch)
+      // Push the denominator upward very slightly to avoid infinities and silliness
+      top / (SimpleBlas.dot(scratch, dir) + 1e-20)
+    }
+
+    // stopping condition
+    def stop(step: Double, ndir: Double, nx: Double): Boolean = {
+        ((step.isNaN) // NaN
+      || (step < 1e-6) // too small or negative
+      || (step > 1e40) // too small; almost certainly numerical problems
+      || (ndir < 1e-12 * nx) // gradient relatively too small
+      || (ndir < 1e-32) // gradient absolutely too small; numerical issues may lurk
+      )
+    }
+
+    val grad = ws.grad
+    val x = ws.x
+    val dir = ws.dir
+    val lastDir = ws.lastDir
+    val res = ws.res
+    val iterMax = Math.max(400, 20 * n)
+    var lastNorm = 0.0
+    var iterno = 0
+    var lastWall = 0 // Last iteration when we hit a bound constraint.
+    var i = 0
+    while (iterno < iterMax) {
+      // find the residual
+      SimpleBlas.gemv(1.0, ata, x, 0.0, res)
+      SimpleBlas.axpy(-1.0, atb, res)
+      SimpleBlas.copy(res, grad)
+
+      // project the gradient
+      i = 0
+      while (i < n) {
+        if (grad.data(i) > 0.0 && x.data(i) == 0.0) {
+          grad.data(i) = 0.0
+        }
+        i = i + 1
+      }
+      val ngrad = SimpleBlas.dot(grad, grad)
+
+      SimpleBlas.copy(grad, dir)
+
+      // use a CG direction under certain conditions
+      var step = steplen(grad, res)
+      var ndir = 0.0
+      val nx = SimpleBlas.dot(x, x)
+      if (iterno > lastWall + 1) {
+        val alpha = ngrad / lastNorm
+        SimpleBlas.axpy(alpha, lastDir, dir)
+        val dstep = steplen(dir, res)
+        ndir = SimpleBlas.dot(dir, dir)
+        if (stop(dstep, ndir, nx)) {
+          // reject the CG step if it could lead to premature termination
+          SimpleBlas.copy(grad, dir)
+          ndir = SimpleBlas.dot(dir, dir)
+        } else {
+          step = dstep
+        }
+      } else {
+        ndir = SimpleBlas.dot(dir, dir)
+      }
+
+      // terminate?
+      if (stop(step, ndir, nx)) {
+        return x.data.clone
+      }
+
+      // don't run through the walls
+      i = 0
+      while (i < n) {
+        if (step * dir.data(i) > x.data(i)) {
+          step = x.data(i) / dir.data(i)
+        }
+        i = i + 1
+      }
+
+      // take the step
+      i = 0
+      while (i < n) {
+        if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) {
+          x.data(i) = 0
+          lastWall = iterno
+        } else {
+          x.data(i) -= step * dir.data(i)
+        }
+        i = i + 1
+      }
+
+      iterno = iterno + 1
+      SimpleBlas.copy(dir, lastDir)
+      lastNorm = ngrad
+    }
+    x.data.clone
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a5d482e/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 0cf9a7f..cfc3b68 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -32,6 +32,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
 import org.apache.spark.util.Utils
+import org.apache.spark.mllib.optimization.NNLS
 
 /**
  * Out-link information for a user or product block. This includes the original user/product IDs
@@ -156,6 +157,18 @@ class ALS private (
     this
   }
 
+  /** If true, do alternating nonnegative least squares. */
+  private var nonnegative = false
+
+  /**
+   * Set whether the least-squares problems solved at each iteration should have
+   * nonnegativity constraints.
+   */
+  def setNonnegative(b: Boolean): ALS = {
+    this.nonnegative = b
+    this
+  }
+
   /**
    * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
    * Returns a MatrixFactorizationModel with feature vectors for each user and product.
@@ -505,6 +518,8 @@ class ALS private (
       }
     }
 
+    val ws = if (nonnegative) NNLS.createWorkspace(rank) else null
+
     // Solve the least-squares problem for each user and return the new feature vectors
     Array.range(0, numUsers).map { index =>
       // Compute the full XtX matrix from the lower-triangular part we got above
@@ -517,14 +532,27 @@ class ALS private (
       }
       // Solve the resulting matrix, which is symmetric and positive-definite
       if (implicitPrefs) {
-        Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
+        solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index), ws)
       } else {
-        Solve.solvePositive(fullXtX, userXy(index)).data
+        solveLeastSquares(fullXtX, userXy(index), ws)
       }
     }
   }
 
   /**
+   * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject
+   * to nonnegativity constraints if `nonnegative` is true.
+   */
+  def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix,
+      ws: NNLS.Workspace): Array[Double] = {
+    if (!nonnegative) {
+      Solve.solvePositive(ata, atb).data
+    } else {
+      NNLS.solve(ata, atb, ws)
+    }
+  }
+
+  /**
    * Given a triangular matrix in the order of fillXtX above, compute the full symmetric square
    * matrix that it represents, storing it into destMatrix.
    */
@@ -550,7 +578,6 @@ class ALS private (
  * Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
  */
 object ALS {
-
   /**
    * Train a matrix factorization model given an RDD of ratings given by users to some products,
    * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the

http://git-wip-us.apache.org/repos/asf/spark/blob/9a5d482e/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
new file mode 100644
index 0000000..bbf3852
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.optimization
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas}
+
+class NNLSSuite extends FunSuite {
+  /** Generate an NNLS problem whose optimal solution is the all-ones vector. */
+  def genOnesData(n: Int, rand: Random): (DoubleMatrix, DoubleMatrix) = {
+    val A = new DoubleMatrix(n, n, Array.fill(n*n)(rand.nextDouble()): _*)
+    val b = A.mmul(DoubleMatrix.ones(n, 1))
+
+    val ata = A.transpose.mmul(A)
+    val atb = A.transpose.mmul(b)
+
+    (ata, atb)
+  }
+
+  test("NNLS: exact solution cases") {
+    val n = 20
+    val rand = new Random(12346)
+    val ws = NNLS.createWorkspace(n)
+    var numSolved = 0
+
+    // About 15% of random 20x20 [-1,1]-matrices have a singular value less than 1e-3.  NNLS
+    // can legitimately fail to solve these anywhere close to exactly.  So we grab a considerable
+    // sample of these matrices and make sure that we solved a substantial fraction of them.
+
+    for (k <- 0 until 100) {
+      val (ata, atb) = genOnesData(n, rand)
+      val x = new DoubleMatrix(NNLS.solve(ata, atb, ws))
+      assert(x.length === n)
+      val answer = DoubleMatrix.ones(n, 1)
+      SimpleBlas.axpy(-1.0, answer, x)
+      val solved = (x.norm2 < 1e-2) && (x.normmax < 1e-3)
+      if (solved) numSolved = numSolved + 1
+    }
+
+    assert(numSolved > 50)
+  }
+
+  test("NNLS: nonnegativity constraint active") {
+    val n = 5
+    val ata = new DoubleMatrix(Array(
+      Array( 4.377, -3.531, -1.306, -0.139,  3.418),
+      Array(-3.531,  4.344,  0.934,  0.305, -2.140),
+      Array(-1.306,  0.934,  2.644, -0.203, -0.170),
+      Array(-0.139,  0.305, -0.203,  5.883,  1.428),
+      Array( 3.418, -2.140, -0.170,  1.428,  4.684)))
+    val atb = new DoubleMatrix(Array(-1.632, 2.115, 1.094, -1.025, -0.636))
+
+    val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628)
+
+    val ws = NNLS.createWorkspace(n)
+    val x = NNLS.solve(ata, atb, ws)
+    for (i <- 0 until n) {
+      assert(Math.abs(x(i) - goodx(i)) < 1e-3)
+      assert(x(i) >= 0)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a5d482e/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 2d944f3..37c9b9d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -48,12 +48,18 @@ object ALSSuite {
       features: Int,
       samplingRate: Double,
       implicitPrefs: Boolean = false,
-      negativeWeights: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = {
+      negativeWeights: Boolean = false,
+      negativeFactors: Boolean = true): (Seq[Rating], DoubleMatrix, DoubleMatrix) = {
     val rand = new Random(42)
 
     // Create a random matrix with uniform values from -1 to 1
-    def randomMatrix(m: Int, n: Int) =
-      new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*)
+    def randomMatrix(m: Int, n: Int) = {
+      if (negativeFactors) {
+        new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*)
+      } else {
+        new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble()): _*)
+      }
+    }
 
     val userMatrix = randomMatrix(users, features)
     val productMatrix = randomMatrix(features, products)
@@ -146,6 +152,10 @@ class ALSSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("NNALS, rank 2") {
+    testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, false)
+  }
+
   /**
    * Test if we can correctly factorize R = U * P where U and P are of known rank.
    *
@@ -159,19 +169,19 @@ class ALSSuite extends FunSuite with LocalSparkContext {
    * @param bulkPredict    flag to test bulk prediciton
    * @param negativeWeights whether the generated data can contain negative values
    * @param numBlocks      number of blocks to partition users and products into
+   * @param negativeFactors whether the generated user/product factors can have negative entries
    */
   def testALS(users: Int, products: Int, features: Int, iterations: Int,
     samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
-    bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1)
+    bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1,
+    negativeFactors: Boolean = true)
   {
     val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
-      features, samplingRate, implicitPrefs, negativeWeights)
-    val model = implicitPrefs match {
-      case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01,
-          numBlocks, 0L)
-      case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01,
-          numBlocks, 1.0, 0L)
-    }
+      features, samplingRate, implicitPrefs, negativeWeights, negativeFactors)
+
+    val model = (new ALS().setBlocks(numBlocks).setRank(features).setIterations(iterations)
+          .setAlpha(1.0).setImplicitPrefs(implicitPrefs).setLambda(0.01).setSeed(0L)
+          .setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings)))
 
     val predictedU = new DoubleMatrix(users, features)
     for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {