You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by coderxiang <gi...@git.apache.org> on 2014/04/20 23:49:27 UTC

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

GitHub user coderxiang opened a pull request:

    https://github.com/apache/spark/pull/458

    [SPARK-1543][MLlib] Add ADMM for solving Lasso (and elastic net) problem

    This PR introduces the Alternating Direction Method of Multipliers (ADMM) for solving Lasso (elastic net, in fact) in mllib. 
    
    ADMM is capable of solving a class of composite minimization problems in a distributed way. Specifically for Lasso (if only L1-regularization) or elastic-net (both L1- and L2- regularization), it requires solving independent systems of linear equations on each partition and a soft-threholding operation on the driver. Unlike SGD, it is a deterministic algorithm (except for the random partition). Details can be found in the [S. Boyd's paper](http://www.stanford.edu/~boyd/papers/admm_distr_stats.html).
    
    The linear algebra operations mainly rely on the Breeze library, particularly, it applies `breeze.linalg.cholesky` to perform cholesky decomposition on each partition to solve the linear system.
    
    I tried to follow the organization of existing Lasso implementation. However, as ADMM is also a good fit for similar optimization problems, e.g., (sparse) logistic regression, it may worth to re-organize and put ADMM into a separate section.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/coderxiang/spark admmlasso

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/458.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #458
    
----
commit 6c06c710dd098a3280fe5a9a5d5f490f6dcd3b9c
Author: lebesgue <le...@lebesgue.net>
Date:   2014-04-20T21:37:28Z

    add admm for lasso

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by debasish83 <gi...@git.apache.org>.
Github user debasish83 commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-42134510
  
    @coderxiang admm should be compared against bfgs based classification/regression that @dbtsai is working on....admm should improve the network transfer from worker to master and thus improve the runtime without affecting misclassification error....r u planning to do that comparison ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by debasish83 <gi...@git.apache.org>.
Github user debasish83 commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-42220945
  
    Depends how you solve L1 with lbfgs...
    
    OWL-QN for L1 is definitely a solution...
    
    You can also replace L1 as soft-max and but then you have to be careful
    with the schedule of soft-max smoothness....
    
    I think just pick OWL-QN for L1 (as it is implemented in breeze) and
    comparing against ADMM will be good....
    
    
    
    On Sun, May 4, 2014 at 10:31 PM, DB Tsai <no...@github.com> wrote:
    
    > lbfgs is not good for L1 problem. I'm working on and preparing to do
    > benchmark with bfgs variant OWL-QN for L1 which is ideal to be compared
    > with ADMM.
    >
    > —
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/458#issuecomment-42160096>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023334
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    --- End diff --
    
    remove extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r11833279
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala ---
    @@ -189,3 +230,70 @@ object LassoWithSGD {
         sc.stop()
       }
     }
    +
    +object LassoWithADMM {
    +  /**
    +   * Train a Lasso model given an RDD of (label, features) pairs using ADMM. We run a fixed number
    +   * of outer ADMM iterations. The weights are initialized using the initial weights provided.
    +   *
    +   * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
    +   *              matrix A as well as the corresponding right hand side label y
    +   * @param numPartitions Number of data blocks to partition the data into
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param l1RegParam l1-regularization parameter
    +   * @param l2RegParam l2-regularization parameter
    +   * @param penalty ADMM penalty of the constraint
    +   * @param initialWeights set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   */
    +  def train(
    +             input: RDD[LabeledPoint],
    +             numPartitions: Int,
    +             numIterations: Int,
    +             l1RegParam: Double,
    +             l2RegParam: Double,
    +             penalty: Double,
    +             initialWeights: Vector): LassoModel = {
    +    new LassoWithADMM(numPartitions, numIterations, l1RegParam, l2RegParam, penalty)
    +      .run(input, initialWeights)
    +  }
    +
    +  /**
    +   * Train a Lasso model given an RDD of (label, features) pairs using ADMM. We run a fixed number
    +   * of outer ADMM iterations. The weights are initialized using default value.
    +   *
    +   * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
    +   *              matrix A as well as the corresponding right hand side label y
    +   * @param numPartitions Number of data blocks to partition the data into
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param l1RegParam l1-regularization parameter
    +   * @param l2RegParam l2-regularization parameter
    +   * @param penalty ADMM penalty of the constraint
    +   */
    +  def train(
    +             input: RDD[LabeledPoint],
    +             numPartitions: Int,
    +             numIterations: Int,
    +             l1RegParam: Double,
    +             l2RegParam: Double,
    +             penalty: Double): LassoModel = {
    +    new LassoWithADMM(numPartitions, numIterations, l1RegParam, l2RegParam, penalty).run(input)
    +  }
    +
    +  def main(args: Array[String]) {
    +    if (args.length != 5) {
    --- End diff --
    
    length of `args` is 7.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r11833024
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    +      def localUpdate(
    +           it: Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))]):
    +           Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +        if (it.hasNext) {
    +          val localData = it.next()
    +          val (x, u) = localData._2
    +          val updatedU = u + ((x - zBroadcast) :* penalty)
    +          // Update local x by solving linear system Ax = q
    +          val (lab, design, chol) = localData._1
    +          val (row, col) = (design.rows, design.cols)
    +          val q = (design.t * lab) + (zBroadcast :* penalty) - u
    +
    +          val updatedX = if (row >= col) {
    +            chol.t \ (chol \ q)
    +          }else {
    --- End diff --
    
    add a space after `}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40992367
  
    @yinxusen Thanks for the comments. I'm running some comparison between SGD and ADMM  right now and will try to post them later. It would also be great if you could provide further testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-41079198
  
    Right - the pattern is virtually identical except for an update function call. Can we abstract this away so that we can deliver the first 3 algorithms of ADMM with a few lines of code so that it's straightforward to add new versions of ADMM algorithms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r11833298
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala ---
    @@ -44,8 +44,11 @@ class LassoSuite extends FunSuite with LocalSparkContext {
           .map { case LabeledPoint(label, features) =>
           LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
         }
    +
         val testRDD = sc.parallelize(testData, 2).cache()
     
    +
    --- End diff --
    
    remove a blank line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023345
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    +      def localUpdate(
    +           it: Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))]
    --- End diff --
    
    improper indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r11833231
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala ---
    @@ -189,3 +230,70 @@ object LassoWithSGD {
         sc.stop()
       }
     }
    +
    +object LassoWithADMM {
    +  /**
    +   * Train a Lasso model given an RDD of (label, features) pairs using ADMM. We run a fixed number
    +   * of outer ADMM iterations. The weights are initialized using the initial weights provided.
    +   *
    +   * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
    +   *              matrix A as well as the corresponding right hand side label y
    +   * @param numPartitions Number of data blocks to partition the data into
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param l1RegParam l1-regularization parameter
    +   * @param l2RegParam l2-regularization parameter
    +   * @param penalty ADMM penalty of the constraint
    +   * @param initialWeights set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   */
    +  def train(
    +             input: RDD[LabeledPoint],
    --- End diff --
    
    4 characters indent in the definition of function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r11833315
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala ---
    @@ -113,9 +116,100 @@ class LassoSuite extends FunSuite with LocalSparkContext {
         validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
       }
     
    +  test("ADMM for Lasso") {
    +    val nPoints = 1000
    +
    +    val A = 2.0
    +    val B = -1.5
    +    val C = 1.0e-2
    +
    +    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)
    +      .map { case LabeledPoint(label, features) =>
    +      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
    --- End diff --
    
    another 2-char indent is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40951631
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark issue #458: [SPARK-1543][MLlib] Add ADMM for solving Lasso (and elasti...

Posted by luxx92 <gi...@git.apache.org>.
Github user luxx92 commented on the issue:

    https://github.com/apache/spark/pull/458
  
    hello,could you share your data-generator code to me.I‘m working on this project at school,hope
    to compare the results with your dataset,thank you. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-41002133
  
    @yinxusen Just sent the data/code to you. For the running time, from the web UI, it appears to be just (nIter * average reduce/aggregate), . The aggregate time for SGD remains almost identical in each iteration, while ADMM's reduce time varies and often taker longer time in later iterations. Since ADMM has early termination criterion, it usually takes less than 500 iterations before convergence. I may include these details in following evaluation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023374
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    +      def localUpdate(
    +           it: Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))]
    +           ):Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +        if (it.hasNext) {
    +          val localData = it.next()
    +          val (x, u) = localData._2
    +          val updatedU = u + ((x - zBroadcast) :* penalty)
    +          // Update local x by solving linear system Ax = q
    +          val (lab, design, chol) = localData._1
    +          val (row, col) = (design.rows, design.cols)
    +          val q = (design.t * lab) + (zBroadcast :* penalty) - u
    +
    +          val updatedX = if (row >= col) {
    +            chol.t \ (chol \ q)
    +          } else {
    +            (q :/ penalty) - ((design.t *(chol.t\(chol\(design * q)))) :/ (penalty * penalty))
    +          }
    +          Iterator((localData._1, (updatedX, updatedU)))
    +        }
    +        else {
    +          it
    +        }
    +      }
    +      dividedData = dividedData.mapPartitions(localUpdate).cache()
    --- End diff --
    
    Using broadcast instead of ser/des, then the `cache()` is saved, since you will use `reduce` to aggregate `last` and `zSum` back to driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-41965253
  
    @yinxusen @mengxr I update the local solver and the running time has been substantially improved. On a cluster with 4 workers, the program converges in 2.3 min for 10k * 10k data and 19 min for 50k * 10k. Most of the computation happens in the preprocess of factorizing each design matrix. The computational cost inside each ADMM iteration is minor now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40991905
  
    @coderxiang It is better to have a detailed test, just like what @mengxr said. I can do a favor on testing if you need.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-41000185
  
    Cool, could you share your data-generator code to me, and let me take care of the `Nan` problem? Besides, could you provide the total running time of SGD and ADMM when they reach a similar loss?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by kellrott <gi...@git.apache.org>.
Github user kellrott commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-54730627
  
    Is there any work still happening with ADMM in Spark? This patch was rejected and the jira issue was closed. Has everyone given up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023406
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    --- End diff --
    
    Do you mean real broadcast variable in Spark? If not, the `zBroadcast` could be removed, since `=` here is a shallow copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40951501
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023381
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    --- End diff --
    
    remove extra line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40958510
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14286/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40951619
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40958505
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-41178782
  
    @yinxusen thanks for the testing. 
     - For lasso, the label actually does not matter since it is a regression model. I'll take care of this in Logistic regression.
     - The normalization is a good point. However, maybe it is better to give a separate procedure for this, since normalization may be required by more than just Lasso.
    
    For the convergence issue of SGD, I guess more complicated SGD algorithm (like dual averaging) may solve this problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r11833249
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala ---
    @@ -189,3 +230,70 @@ object LassoWithSGD {
         sc.stop()
       }
     }
    +
    +object LassoWithADMM {
    +  /**
    +   * Train a Lasso model given an RDD of (label, features) pairs using ADMM. We run a fixed number
    +   * of outer ADMM iterations. The weights are initialized using the initial weights provided.
    +   *
    +   * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
    +   *              matrix A as well as the corresponding right hand side label y
    +   * @param numPartitions Number of data blocks to partition the data into
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param l1RegParam l1-regularization parameter
    +   * @param l2RegParam l2-regularization parameter
    +   * @param penalty ADMM penalty of the constraint
    +   * @param initialWeights set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   */
    +  def train(
    +             input: RDD[LabeledPoint],
    +             numPartitions: Int,
    +             numIterations: Int,
    +             l1RegParam: Double,
    +             l2RegParam: Double,
    +             penalty: Double,
    +             initialWeights: Vector): LassoModel = {
    +    new LassoWithADMM(numPartitions, numIterations, l1RegParam, l2RegParam, penalty)
    +      .run(input, initialWeights)
    +  }
    +
    +  /**
    +   * Train a Lasso model given an RDD of (label, features) pairs using ADMM. We run a fixed number
    +   * of outer ADMM iterations. The weights are initialized using default value.
    +   *
    +   * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
    +   *              matrix A as well as the corresponding right hand side label y
    +   * @param numPartitions Number of data blocks to partition the data into
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param l1RegParam l1-regularization parameter
    +   * @param l2RegParam l2-regularization parameter
    +   * @param penalty ADMM penalty of the constraint
    +   */
    +  def train(
    +             input: RDD[LabeledPoint],
    --- End diff --
    
    4-char indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40997301
  
    @mengxr  @yinxusen  Here are some comparison results between ADMM and SGD. These results are just for these particular parameter settings and data sets and no over-generalization should be drawn.
    
    The experiments are carried out on a small data set (200 by 200 design matrix) and a large one (10k by 10k design matrix), both randomly generated.  Only L1-regularization is employed and the parameter is 5 and 20 respectively. At most 500 iterations are run.
    
    | Method                                 | loss-small / loss-large           |  AUC-small / AUC-large :|
    | ------------------------------------|:------------------------------------:|:------------------------------:|
    | SGD                                     | 96.42 /NaN  |    0.8838 / NaN       |
    | ADMM                                  | 93.54 / 4008.55  |  0.8771/ 0.9464       |
    | FISTA (local lasso solver)    | 93.52 / 4009.88  | 0.8767/0.9481|    
    
    On average, each aggregate step in SGD takes 11s on average, while each reduce step in ADMM requires 8s, as shown on the web UI. The ROC is available from [here](https://www.dropbox.com/s/4qhdz3nugzr5kcu/spark_admm_pr.jpg)
    
    I tried two parameter settings (stepsize=0.05/0.01, iter=500/100) for SGD , both seem to encounter some convergence problem. The results are shown as NaN. Maybe we can discuss this separately.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-41148756
  
    @coderxiang I do some experiments on your dataset.
    * For MLlib, you should first rewrite your labels {+1, -1} into {+1, 0}. [Reference here](http://54.82.240.23:4000/mllib-linear-methods.html#binary-classification)
    * For Lasso, you need preprocess your dataset, and make it with zero mean and unit norm. [Reference here](http://stats.stackexchange.com/questions/19523/need-for-centering-and-standardizing-data-in-regression). @mengxr just removed the former preprocessing because it is not elegant.
    
    I open a [JIRA issue](https://issues.apache.org/jira/browse/SPARK-1585) to explain the reason why `Infinity` occurs. IMHO, I prefer rewriting [this line]( https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala#L127) into
    
    `brzAxpy(2.0 * diff / weights.size, brzData, cumGradient.toBreeze)`
    
    to do average, since the gradient is used for updating each single element of weights. But I am not sure of that, maybe @mengxr and @etrain could give us some suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-42160096
  
    lbfgs is not good for L1 problem. I'm working on and preparing to do benchmark with bfgs variant OWL-QN for L1 which is ideal to be compared with ADMM. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-41262172
  
    I do the preprocess of your data, make it with zero-mean, unit norm. But Lasso also performances poorly, with Infinity results or rising losses.
    
    Since Lasso is a regression method, maybe we should use regression data to test it. the current classification dataset is not suitable. @coderxiang Do you have any regression dataset?
    
    More complicated SGDs are worthy of consideration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023410
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    +      def localUpdate(
    +           it: Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))]
    +           ):Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +        if (it.hasNext) {
    +          val localData = it.next()
    +          val (x, u) = localData._2
    +          val updatedU = u + ((x - zBroadcast) :* penalty)
    +          // Update local x by solving linear system Ax = q
    +          val (lab, design, chol) = localData._1
    +          val (row, col) = (design.rows, design.cols)
    +          val q = (design.t * lab) + (zBroadcast :* penalty) - u
    +
    +          val updatedX = if (row >= col) {
    +            chol.t \ (chol \ q)
    +          } else {
    +            (q :/ penalty) - ((design.t *(chol.t\(chol\(design * q)))) :/ (penalty * penalty))
    +          }
    +          Iterator((localData._1, (updatedX, updatedU)))
    +        }
    +        else {
    +          it
    +        }
    +      }
    +      dividedData = dividedData.mapPartitions(localUpdate).cache()
    +      val (last, zSum) = dividedData.map{case (u, v) =>
    +        val (lab, design, chol) = u
    +        val residual = design * zBroadcast - lab
    +        (0.5 * residual.dot(residual), (v._2 :/ penalty) + v._1)
    +      }.reduce{case (x, y) => (x._1 + y._1, x._2 + y._2)}
    --- End diff --
    
    Be careful with `reduce`, it will throw exception if some of the partitions are empty. Using `aggregate` is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r11833194
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala ---
    @@ -87,6 +85,49 @@ class LassoWithSGD private (
     }
     
     /**
    + * Train a regression model with L1-regularization and L2-regularization using
    + * Alternating Direction Method of Multiplier (ADMM).
    + * This solves the l1-regularized least squares regression formulation
    + *    f(weights) = 1/2 ||A weights-y||^2  + l1RegParam ||weights||_1 + l2RegParam/2 ||weights||_2^2
    + * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
    + * its corresponding right hand side label y.
    + * See also the documentation for the precise formulation.
    + */
    +class LassoWithADMM private (
    +    var numPartitions: Int,
    +    var numIterations: Int,
    +    var l1RegParam: Double,
    +    var l2RegParam: Double,
    +    var penalty: Double)
    +  extends GeneralizedLinearAlgorithm[LassoModel] with Serializable {
    +
    --- End diff --
    
    A single blank line is good enough. Same below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40997859
  
    @etrain that's a good point if ADMM implementations of other algorithms are going to be added in MLlib. Fortunately, for lasso, ridge regression and sparse logistic regression, the computation on the driver and pretty similar, all need is to write separate local optimization programs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40951690
  
    @coderxiang Did you compare ADMM and SGD/L-BFGS implemented in MLlib on some large datasets?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark issue #458: [SPARK-1543][MLlib] Add ADMM for solving Lasso (and elasti...

Posted by debasish83 <gi...@git.apache.org>.
Github user debasish83 commented on the issue:

    https://github.com/apache/spark/pull/458
  
    ADMM is already implemented as part of Breeze proximal NonlinearMinimizer where the ADMM solver stays in master and gradient calculator is used in similar manner as how Breeze LBFGS/OWLQN has been plugged in...I did not open up a PR since OWLQN has been chosen for L1 logistic...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by ajtulloch <gi...@git.apache.org>.
Github user ajtulloch commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-42758859
  
    I agree with @etrain, it's possible to abstract out the ADMM optimisation routine such that it's trivial to implement L1-logistic regression, lasso, [SVMs](http://web.eecs.umich.edu/~honglak/aistats12-admmDistributedSVM.pdf), etc with very few additional lines of code.  I implemented that for Spark a few months ago (albeit a naive, unperformant, and untested implementation). 
    
    If you wanted to see an alternative way of structuring this diff, my code is available at [ajtulloch/spark/SPARK-1794-GenericADMM](https://github.com/ajtulloch/spark/tree/SPARK-1794-GenericADMM).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023346
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    +      def localUpdate(
    +           it: Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))]
    +           ):Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    --- End diff --
    
    same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40994263
  
    Hey, this looks awesome! One high-level issue I see is that the ADMM optimizer has embedded in it knowledge of the loss function it's trying to minimize. ADMM is much more general than that and is nicely scalable - can we abstract out the general ADMM computation pattern out in a spirit similar to what we've done with GradientDescent - and have Lasso, SVM, etc. done with ADMM as subclasses that implement a specialized "compute" function (or something)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023354
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    +      def localUpdate(
    --- End diff --
    
    How about moving this function definition out of while loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r11833060
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    +      def localUpdate(
    +           it: Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))]):
    --- End diff --
    
    It is better to put the `):` in the next line, with the returned type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/458#discussion_r12023367
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/ADMMLasso.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV, DenseMatrix => BDM, cholesky, norm}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.{Partitioner, HashPartitioner, Logging}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.annotation.DeveloperApi
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Class used to solve the optimization problem in ADMMLasso
    + */
    +@DeveloperApi
    +class ADMMLasso
    +  extends Optimizer with Logging
    +{
    +  private var numPartitions: Int = 10
    +  private var numIterations: Int = 100
    +  private var l1RegParam: Double = 1.0
    +  private var l2RegParam: Double = .0
    +  private var penalty: Double = 10.0
    +
    +
    +  /**
    +   * Set the number of partitions for ADMM. Default 10
    +   */
    +  def setNumPartitions(parts: Int): this.type = {
    +    this.numPartitions = parts
    +    this
    +  }
    +
    +  /**
    +   * Set the number of iterations for ADMM. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the l1-regularization parameter. Default 1.0.
    +   */
    +  def setL1RegParam(regParam: Double): this.type = {
    +    this.l1RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the l2-regularization parameter. Default .0
    +   */
    +  def setL2RegParam(regParam: Double): this.type = {
    +    this.l2RegParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the penalty parameter. Default 10.0
    +   */
    +  def setPenalty(penalty: Double): this.type = {
    +    this.penalty = penalty
    +    this
    +  }
    +
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
    +    val (weights, _) = ADMMLasso.runADMM(data, numPartitions, numIterations, l1RegParam,
    +      l2RegParam, penalty, initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run ADMMLasso.
    + */
    +@DeveloperApi
    +object ADMMLasso extends Logging {
    +
    +  /**
    +   * @param data  Input data for ADMMLasso. RDD of the set of data examples, each of
    +   *               the form (label, [feature values]).
    +   * @param numPartitions  number of data blocks to partition the RDD into
    +   * @param numIterations  number of iterations that ADMM should be run.
    +   * @param l1RegParam  l1-regularization parameter
    +   * @param l2RegParam  l2-regularization parameter
    +   * @param penalty  The penalty parameter in ADMM
    +   * @param initialWeights  Initial set of weights to be used. Array should be equal in size to
    +   *        the number of features in the data.
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runADMM(
    +      data: RDD[(Double, Vector)],
    +      numPartitions: Int,
    +      numIterations: Int,
    +      l1RegParam: Double,
    +      l2RegParam: Double,
    +      penalty: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    // Initialize weights as a column vector
    +    val p = initialWeights.size
    +
    +    // Consensus variable
    +    var z =  BDV.zeros[Double](p)
    +
    +    // Transform the input data into ADMM format
    +    def collectBlock(it: Iterator[(Vector, Double)]):
    +        Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +      val lab = new ArrayBuffer[Double]()
    +      val features = new ArrayBuffer[Double]()
    +      var row = 0
    +      it.foreach {case (f, l) =>
    +        lab += l
    +        features ++= f.toArray
    +        row += 1
    +      }
    +      val col = features.length/row
    +
    +      val designMatrix = new BDM(col, features.toArray).t
    +
    +      // Precompute the cholesky decomposition for solving linear system inside each partition
    +      val chol = if (row >= col) {
    +        cholesky((designMatrix.t * designMatrix) + (BDM.eye[Double](col) :* penalty))
    +      }
    +      else cholesky(((designMatrix * designMatrix.t) :/ penalty) + BDM.eye[Double](row))
    +
    +      Iterator(((BDV(lab.toArray), designMatrix, chol),
    +        (BDV(initialWeights.toArray), BDV.zeros[Double](col))))
    +    }
    +
    +    val partitionedData = data.map{case (x, y) => (y, x)}
    +      .partitionBy(new HashPartitioner(numPartitions)).cache()
    +
    +    // ((lab, design, chol), (x, u))
    +    var dividedData = partitionedData.mapPartitions(collectBlock, true)
    +
    +    var iter = 1
    +    var minorChange: Boolean = false
    +    while (iter <= numIterations && !minorChange) {
    +      val zBroadcast = z
    +      def localUpdate(
    +           it: Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))]
    +           ):Iterator[((BDV[Double], BDM[Double], BDM[Double]), (BDV[Double], BDV[Double]))] = {
    +        if (it.hasNext) {
    +          val localData = it.next()
    +          val (x, u) = localData._2
    +          val updatedU = u + ((x - zBroadcast) :* penalty)
    +          // Update local x by solving linear system Ax = q
    +          val (lab, design, chol) = localData._1
    +          val (row, col) = (design.rows, design.cols)
    +          val q = (design.t * lab) + (zBroadcast :* penalty) - u
    +
    +          val updatedX = if (row >= col) {
    +            chol.t \ (chol \ q)
    +          } else {
    +            (q :/ penalty) - ((design.t *(chol.t\(chol\(design * q)))) :/ (penalty * penalty))
    +          }
    +          Iterator((localData._1, (updatedX, updatedU)))
    --- End diff --
    
    `localData._1` is not modified in the function. How about using broadcast instead of variable serialization? The iterative useless ser/des slows down the procedure, especially when the local design matrix is too large.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang closed the pull request at:

    https://github.com/apache/spark/pull/458


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1543][MLlib] Add ADMM for solving Lasso...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/458#issuecomment-40905883
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---