You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yinxusen <gi...@git.apache.org> on 2014/03/18 07:43:35 UTC

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

GitHub user yinxusen opened a pull request:

    https://github.com/apache/spark/pull/166

    [WIP] [MLLIB-28] An optimized GradientDescent implementation 

    New JIRA issue [MLLIB-28](https://spark-project.atlassian.net/browse/MLLIB-28) with this pull request bring a new implementation of `GradientDescent` named `GradientDescentWithLocalUpdate`. The `GradientDescentWithLocalUpdate` can outperform the original `GradientDescent` by about 1x ~ 4x without sacrificing accuracy, and can be easily adopted by most classification and regression algorithms in MLlib.
    
    Parallelism of many ML algorithms are limited by the sequential updating process of optimization algorithms they use. However, by carefully breaking the sequential chain, the updating process can be parallelized. In the`GradientDescentWithLocalUpdate` , we split the iteration loop into multiple supersteps. Within each superstep, an inner loop that runs a local optimization process is introduced into each partition. During the local optimization, only local data points in the partition are involved. Since different partitions are processed in parallel, the local optimization process is natually parallelized. Then, at the end of each superstep, all the gradients and loss histories computed from each partition are collected and merged in a bulk synchronous manner.
    
    Detailed experiments and results in the original [pull request](https://github.com/apache/incubator-spark/pull/407) and [comments](https://github.com/apache/incubator-spark/pull/407#issuecomment-33356196).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yinxusen/spark gradient-local-update

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/166.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #166
    
----
commit 9b8dd56663fae556549a61f265996cbd3414f35e
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-03T10:30:30Z

    add new optimizer for GradientDescent, with local updater

commit 881ea122188562830f197392addc6f69232c8736
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-18T05:49:27Z

    Merge branch 'master' into gradient-local-update

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-38298672
  
    One or more automated tests failed
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13322/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-38881872
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10696719
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    +          val rand = new Random(42 + i * numOuterIterations + j)
    +          val sampled = iterCurrent.filter(x => rand.nextDouble() <= miniBatchFraction)
    +          val (gradientSum, lossSum) = sampled.map { case (y, features) =>
    +            val featuresCol = new DoubleMatrix(features.length, 1, features: _*)
    +            val (grad, loss) = gradient.compute(featuresCol, y, weights)
    +            (grad, loss)
    +          }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
    +
    +          localLossHistory += lossSum / miniBatchSize + regVal
    +
    +          val update = updater.compute(weights, gradientSum.div(miniBatchSize),
    +            stepSize, (i - 1) + numOuterIterations + j, regParam)
    +
    +          weights = update._1
    +          regVal = update._2
    +
    +          iterReserved = iterNext
    +        }
    +
    +        List((weights, localLossHistory.toArray)).iterator
    +      }
    +
    +      val c = weightsAndLosses.collect()
    +      val (ws, ls) = c.unzip
    +
    +      stochasticLossHistory.append(ls.head.reduce(_ + _) / ls.head.size)
    +
    +      val weightsSum = ws.reduce(_ addi _)
    +      weights = weightsSum.divi(c.size)
    +    }
    +
    +    logInfo("GradientDescentWithLocalUpdate finished. Last 10 stochastic losses %s".format(
    --- End diff --
    
    How about changing 10 to "a few"? Length of `stochasticLossHistory` may be less than 10 :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10696327
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.scalatest.FunSuite
    +import org.scalatest.matchers.ShouldMatchers
    +
    +import org.apache.spark.mllib.regression._
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +class GradientDescentWithLocalUpdateSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
    --- End diff --
    
    100 columns exceeded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10696552
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    +          val rand = new Random(42 + i * numOuterIterations + j)
    +          val sampled = iterCurrent.filter(x => rand.nextDouble() <= miniBatchFraction)
    +          val (gradientSum, lossSum) = sampled.map { case (y, features) =>
    +            val featuresCol = new DoubleMatrix(features.length, 1, features: _*)
    +            val (grad, loss) = gradient.compute(featuresCol, y, weights)
    +            (grad, loss)
    +          }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
    --- End diff --
    
    An edge case: do we need to consider empty partition here (`reduce` complains about empty collection)? We can use `reduceOption` together with default values for `gradientSum` and `lossSum`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-37906515
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13229/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-37939348
  
    Left some comments on minor issues like formatting. LGTM otherwise.
    
    @srowen According to [previous experiments](https://github.com/apache/incubator-spark/pull/407), this implementation is indeed better than the original one, +1 for the replacement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10696072
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    --- End diff --
    
    Hmm... since BSP+ is not a well known concept and there's no related references (yet), maybe we should not use this term? Any suggestions @mengxr?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-40668271
  
    I rewrite the 2 versions of `GradientDescent` with `Vector` instead of `Array`. Lasso is easy to test now thanks for @mengxr 's refactoring of code.
    
    I run the test on a single node, in local mode. Note that original version runs 100 iterations, while the other two run 10 iterations with 10 local iterations.
    
    latest update:
    
    | Type        | Time           | Last 10 losses  |
    | ------------- |:-------------:| -----:|
    | original LR     | 346 | 0.6444 - 0.6430 |
    | 1-version LR     | 45      |  0.7082-0.6773  |
    | **2-version LR** | **37**     | **0.7070-0.6817**    |
    | original SVM     | 338 | 0.9468 - 0.9468 |
    | 1-version SVM | 46 |  0.7861 - 0.7803  |
    | **2-version SVM** | **34** | **0.7875 - 0.7829** |
    | original Lasso | 320 | 0.6063 - 0.6063 |
    | 1-version Lasso | 39  |  0.6131 - 0.2062 |
    | **2-version Lasso** | **32** | **0.6062 - 0.2104** |
    
    1-version is not good due to the reuse of `Iterator`, which inherently store all elements in a queue and will cause OOM if the data entry in a partition is large enough. 2-version is better, but due to the tiny-batch property, 2-version `GradientDescent`'s convergence ability is slightly lower than the 1-version. There is a trade-off between hardware efficiency and statistical efficiency.
    
    I port my code into an independent [git repo](https://github.com/yinxusen/gradient_descent_variants) so as to do experiments more easily, I'll move them back here recently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10729146
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    --- End diff --
    
    I agree. We need a reference here or at least explain `BSP+`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-38298529
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-37930965
  
    In fact, if we set the `numInnerIteration = 1`, which is the default setting, then the `GradientDescentWithLocalUpdate` is identical to `GradientDescent`. However, I think it is better if we have opportunity to add new updater easily. End-user may have the requirement, for adding new customized updater.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-37903948
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-37925067
  
    Broad question: can this simply replace the existing implementation, if it's better? I'd suggest it is important to not let a bunch of different implementations proliferate, but unify them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-37940020
  
    @srowen Forgot to mention, [@etrain's comment](https://github.com/apache/incubator-spark/pull/407#issuecomment-33299956) should be one of the reasons why this PR doesn't try to replace the original one. BTW, basically I'm not an ML guy, so please ignore me if I'm saying rubbish :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10695946
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -32,10 +32,10 @@ import scala.collection.mutable.ArrayBuffer
     class GradientDescent(var gradient: Gradient, var updater: Updater)
       extends Optimizer with Logging
     {
    --- End diff --
    
    Move right brace to the end of the last line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10735781
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    --- End diff --
    
    Good question, I look into [`duplicate`](https://github.com/scala/scala/blob/master/src/library/scala/collection/Iterator.scala#L1060) method just now. It uses a `scala.collection.mutuable.Queue` to mimic an `iterator`, and the elements iterated by one iterator but not yet by the other is stored there. I am shocked by that...
    
    I have no idea of the memory cost by the `Queue`, but it seems the only way to duplicate an `iterator`. We have already tested before that the method is really fast than `iterator.toArray`. @liancheng Do you know about that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10695930
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -32,10 +32,10 @@ import scala.collection.mutable.ArrayBuffer
     class GradientDescent(var gradient: Gradient, var updater: Updater)
       extends Optimizer with Logging
     {
    -  private var stepSize: Double = 1.0
    -  private var numIterations: Int = 100
    -  private var regParam: Double = 0.0
    -  private var miniBatchFraction: Double = 1.0
    +   protected var stepSize: Double = 1.0
    +   protected var numIterations: Int = 100
    +   protected var regParam: Double = 0.0
    +   protected var miniBatchFraction: Double = 1.0
    --- End diff --
    
    Indentation error, should be 2 spaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-38296681
  
    I use the new method to enlarge local update. Test on SVM and LogisticRegression looks as good as the first version, without the worry of OOM. This method can get better result in shorter time, especially when the dataset is too large to cache in memory.
    
    I think this method is much more like the method provided [here](http://arxiv.org/pdf/1209.2191.pdf) in section 3. I'm not mentioned that it is a better way, but the original `GradientDescent` is somewhat like an elephant pulling a small carriage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10739905
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    --- End diff --
    
    @yinxusen I think this approach will certainly run OOM if data is too big to fit into memory. You can set a small executor memory and test some data without caching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-38298670
  
    Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10738514
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    --- End diff --
    
    We're using `Iterator.duplicate` to iterate the dataset multiple times without calling `Iterator.toArray`. According to implementation of `duplicate`, it does consume more memory and generates more temporary objects. I didn't notice that before. But according to previous experiments, `duplicate` is much more GC-friendly than `toArray`. I think the reason is that the underlying implementation of the `mutable.Queue` used in `duplicate` is actually a `mutable.LinkedList`, which doesn't require large amount of *continuous* memory, and thus may trigger full-GC less frequently.
    
    If my guess is right, instead of using `duplicate` maybe we can simply call `Iterator.toList` reduce full-GC frequency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen closed the pull request at:

    https://github.com/apache/spark/pull/166


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10696002
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    --- End diff --
    
    Please reorder the imports according to [Spark coding convention](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-40922790
  
    I'd like to close the PR, for the offline discussion with @mengxr . The code will be stay in my github repo, for those who still interested in it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-37906512
  
    Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-38345315
  
    I have test the original/1-version/2-version LR and SVM, here is the result:
    
    (Note that original version runs 100 iterations, while the other two run 10 iterations with 10 local iterations.)
    
    | Type         | Time           | Last 10 losses  |
    | ------------- |:--------------:| -------------------:|
    | original LR     | 89.348 | 0.638 - 0.636 |
    | 1-version LR  | 13.094      |   1.769 - 0.636 |
    | **2-version LR**      | **10.747** |   **1.618 - 0.631** |
    | original SVM | 88.708 | 0.947 - 0.947 |
    | 1-version SVM | 13.062    |  2.127 - 0.694 |
    | **2-version SVM** | **10.829** |   **1.943 - 0.691** |
    
    There are 3 updaters : L1-updater, L2-updater, and simple updater, and 3 gradients: Logistic, Square and Hinge. SVM uses Hinge+L2, LR uses Logistic+simple, Lasso uses Square+L1.
    
    But I encounter some difficulties in Lasso, I am still trying to fix them.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10787371
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    --- End diff --
    
    @mengxr, I absolutely agree with you. I am trying another way now, and will have a test result tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-38298530
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10695955
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    --- End diff --
    
    Move the right brace to the end of the last line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10696089
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    --- End diff --
    
    Remove the space before the colon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10696461
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    +          val rand = new Random(42 + i * numOuterIterations + j)
    +          val sampled = iterCurrent.filter(x => rand.nextDouble() <= miniBatchFraction)
    +          val (gradientSum, lossSum) = sampled.map { case (y, features) =>
    +            val featuresCol = new DoubleMatrix(features.length, 1, features: _*)
    +            val (grad, loss) = gradient.compute(featuresCol, y, weights)
    +            (grad, loss)
    --- End diff --
    
    We can simply return `gradient.compute(...)` here without introducing `grad` and `loss`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10729445
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    --- End diff --
    
    I'm not very familiar with how `duplicate` is implemented. Scala doc says "The implementation may allocate temporary storage for elements iterated by one iterator but not yet by the other." Is there a risk of running out of memory here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10696799
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.scalatest.FunSuite
    +import org.scalatest.matchers.ShouldMatchers
    +
    +import org.apache.spark.mllib.regression._
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +class GradientDescentWithLocalUpdateSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
    --- End diff --
    
    Also, seems that there's no need to mixin `ShouldMatchers` since the test suite doesn't use related APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/166#discussion_r10734823
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +
    +import org.jblas.DoubleMatrix
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +/**
    + * Class used to solve an optimization problem using Gradient Descent.
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every iteration.
    + */
    +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
    +  extends GradientDescent(gradient, updater) with Logging
    +{
    +  private var numLocalIterations: Int = 1
    +
    +  /**
    +   * Set the number of local iterations. Default 1.
    +   */
    +  def setNumLocalIterations(numLocalIter: Int): this.type = {
    +    this.numLocalIterations = numLocalIter
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    +    : Array[Double] = {
    +
    +    val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
    +        data,
    +        gradient,
    +        updater,
    +        stepSize,
    +        numIterations,
    +        numLocalIterations,
    +        regParam,
    +        miniBatchFraction,
    +        initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +// Top-level method to run gradient descent.
    +object GradientDescentWithLocalUpdate extends Logging {
    +   /**
    +   * Run BSP+ gradient descent in parallel using mini batches.
    +   *
    +   * @param data - Input data for SGD. RDD of form (label, [feature values]).
    +   * @param gradient - Gradient object that will be used to compute the gradient.
    +   * @param updater - Updater object that will be used to update the model.
    +   * @param stepSize - stepSize to be used during update.
    +   * @param numOuterIterations - number of outer iterations that SGD should be run.
    +   * @param numInnerIterations - number of inner iterations that SGD should be run.
    +   * @param regParam - regularization parameter
    +   * @param miniBatchFraction - fraction of the input data set that should be used for
    +   *                            one iteration of SGD. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a column matrix containing
    +   *         weights for every feature, and the second element is an array containing the stochastic
    +   *         loss computed for every iteration.
    +   */
    +  def runMiniBatchSGD(
    +      data: RDD[(Double, Array[Double])],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      numOuterIterations: Int,
    +      numInnerIterations: Int,
    +      regParam: Double,
    +      miniBatchFraction: Double,
    +      initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
    +
    +    val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations)
    +
    +    val numExamples: Long = data.count()
    +    val numPartition = data.partitions.length
    +    val miniBatchSize = numExamples * miniBatchFraction / numPartition
    +
    +    // Initialize weights as a column vector
    +    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
    +    var regVal = 0.0
    +
    +    for (i <- 1 to numOuterIterations) {
    +      val weightsAndLosses = data.mapPartitions { iter =>
    +        var iterReserved = iter
    +        val localLossHistory = new ArrayBuffer[Double](numInnerIterations)
    +
    +        for (j <- 1 to numInnerIterations) {
    +          val (iterCurrent, iterNext) = iterReserved.duplicate
    +          val rand = new Random(42 + i * numOuterIterations + j)
    +          val sampled = iterCurrent.filter(x => rand.nextDouble() <= miniBatchFraction)
    +          val (gradientSum, lossSum) = sampled.map { case (y, features) =>
    +            val featuresCol = new DoubleMatrix(features.length, 1, features: _*)
    +            val (grad, loss) = gradient.compute(featuresCol, y, weights)
    +            (grad, loss)
    +          }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
    --- End diff --
    
    Hmm... Sounds good. I should take care of it, especially when the data in each partition is little and the `miniBatchFraction` is low, a blank partition could occur.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [MLLIB-28] An optimized GradientDescent ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/166#issuecomment-37903949
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---