You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "EcoMotto Inc." <ec...@gmail.com> on 2015/03/16 23:19:23 UTC

Can LBFGS be used on streaming data?

Hello,

I am new to spark streaming API.

I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on
streaming data? Currently I am using forecahRDD for parsing through DStream
and I am generating a model based on each RDD. Am I doing anything
logically wrong here?
Thank you.

Sample Code:

val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
var initialWeights =
Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
var isFirst = true
var model = new LinearRegressionModel(null,1.0)

parsedData.foreachRDD{rdd =>
  if(isFirst) {
    val weights = algorithm.optimize(rdd, initialWeights)
    val w = weights.toArray
    val intercept = w.head
    model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
    isFirst = false
  }else{
    var ab = ArrayBuffer[Double]()
    ab.insert(0, model.intercept)
    ab.appendAll( model.weights.toArray)
    print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
    initialWeights = Vectors.dense(ab.toArray)
    print("Initial Weights: "+ initialWeights)
    val weights = algorithm.optimize(rdd, initialWeights)
    val w = weights.toArray
    val intercept = w.head
    model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
  }



Best Regards,
Arunkumar

Re: Can LBFGS be used on streaming data?

Posted by "EcoMotto Inc." <ec...@gmail.com>.
Hello Jeremy,

Sorry for the delayed reply!

First issue was resolved, I believe it was just production and consumption
rate problem.

Regarding the second question, I am streaming the data from the file and
there are about 38k records. I am sending the streams in the same sequence
as I am reading from the file, but I am getting different weights each time
may be something to do with how the DStreams are being processed.

Can you suggest me some solution for this case? my requirement is that my
program must generate the same weights for both static and streaming data?
Thank you for your help!

Best Regards,
Arunkumar


On Thu, Mar 19, 2015 at 9:25 PM, Jeremy Freeman <fr...@gmail.com>
wrote:

> Regarding the first question, can you say more about how you are loading
> your data? And what is the size of the data set? And is that the only error
> you see, and do you only see it in the streaming version?
>
> For the second question, there are a couple reasons the weights might
> slightly differ, it depends on exactly how you set up the comparison. When
> you split it into 5, were those the same 5 chunks of data you used for the
> streaming case? And were they presented to the optimizer in the same order?
> Difference in either could produce small differences in the resulting
> weights, but that doesn’t mean it’s doing anything wrong.
>
> -------------------------
> jeremyfreeman.net
> @thefreemanlab
>
> On Mar 17, 2015, at 6:19 PM, EcoMotto Inc. <ec...@gmail.com> wrote:
>
> Hello Jeremy,
>
> Thank you for your reply.
>
> When I am running this code on the local machine on a streaming data, it
> keeps giving me this error:
> *WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost):
> java.io.FileNotFoundException:
> /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file
> or directory) *
>
> And when I execute the same code on a static data after randomly splitting
> it into 5 sets, it gives me a little bit different weights (difference is
> in decimals). I am still trying to analyse why would this be happening.
> Any inputs, on why would this be happening?
>
> Best Regards,
> Arunkumar
>
>
> On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman <freeman.jeremy@gmail.com
> > wrote:
>
>> Hi Arunkumar,
>>
>> That looks like it should work. Logically, it’s similar to the
>> implementation used by StreamingLinearRegression and
>> StreamingLogisticRegression, see this class:
>>
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
>>
>> which exposes the kind of operation your describing (for any linear
>> method).
>>
>> The nice thing about the gradient-based methods is that they can use
>> existing MLLib optimization routines in this fairly direct way. Other
>> methods (such as KMeans) require a bit more reengineering.
>>
>> — Jeremy
>>
>> -------------------------
>> jeremyfreeman.net
>> @thefreemanlab
>>
>> On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ec...@gmail.com> wrote:
>>
>> Hello,
>>
>> I am new to spark streaming API.
>>
>> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on
>> streaming data? Currently I am using forecahRDD for parsing through DStream
>> and I am generating a model based on each RDD. Am I doing anything
>> logically wrong here?
>> Thank you.
>>
>> Sample Code:
>>
>> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
>> var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
>> var isFirst = true
>> var model = new LinearRegressionModel(null,1.0)
>>
>> parsedData.foreachRDD{rdd =>
>>   if(isFirst) {
>>     val weights = algorithm.optimize(rdd, initialWeights)
>>     val w = weights.toArray
>>     val intercept = w.head
>>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>>     isFirst = false
>>   }else{
>>     var ab = ArrayBuffer[Double]()
>>     ab.insert(0, model.intercept)
>>     ab.appendAll( model.weights.toArray)
>>     print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
>>     initialWeights = Vectors.dense(ab.toArray)
>>     print("Initial Weights: "+ initialWeights)
>>     val weights = algorithm.optimize(rdd, initialWeights)
>>     val w = weights.toArray
>>     val intercept = w.head
>>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>>   }
>>
>>
>>
>> Best Regards,
>> Arunkumar
>>
>>
>>
>
>

Re: Can LBFGS be used on streaming data?

Posted by Jeremy Freeman <fr...@gmail.com>.
Regarding the first question, can you say more about how you are loading your data? And what is the size of the data set? And is that the only error you see, and do you only see it in the streaming version?

For the second question, there are a couple reasons the weights might slightly differ, it depends on exactly how you set up the comparison. When you split it into 5, were those the same 5 chunks of data you used for the streaming case? And were they presented to the optimizer in the same order? Difference in either could produce small differences in the resulting weights, but that doesn’t mean it’s doing anything wrong.

-------------------------
jeremyfreeman.net
@thefreemanlab

On Mar 17, 2015, at 6:19 PM, EcoMotto Inc. <ec...@gmail.com> wrote:

> Hello Jeremy,
> 
> Thank you for your reply.
> 
> When I am running this code on the local machine on a streaming data, it keeps giving me this error:
> WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): java.io.FileNotFoundException: /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or directory) 
> 
> And when I execute the same code on a static data after randomly splitting it into 5 sets, it gives me a little bit different weights (difference is in decimals). I am still trying to analyse why would this be happening.
> Any inputs, on why would this be happening?
> 
> Best Regards,
> Arunkumar
> 
> 
> On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman <fr...@gmail.com> wrote:
> Hi Arunkumar,
> 
> That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class:
> 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
> 
> which exposes the kind of operation your describing (for any linear method).
> 
> The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering.
> 
> — Jeremy
> 
> -------------------------
> jeremyfreeman.net
> @thefreemanlab
> 
> On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ec...@gmail.com> wrote:
> 
>> Hello,
>> 
>> I am new to spark streaming API.
>> 
>> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here?
>> Thank you.
>> 
>> Sample Code:
>> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
>> var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
>> var isFirst = true
>> var model = new LinearRegressionModel(null,1.0)
>> 
>> parsedData.foreachRDD{rdd =>
>>   if(isFirst) {
>>     val weights = algorithm.optimize(rdd, initialWeights)
>>     val w = weights.toArray
>>     val intercept = w.head
>>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>>     isFirst = false
>>   }else{
>>     var ab = ArrayBuffer[Double]()
>>     ab.insert(0, model.intercept)
>>     ab.appendAll( model.weights.toArray)
>>     print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
>>     initialWeights = Vectors.dense(ab.toArray)
>>     print("Initial Weights: "+ initialWeights)
>>     val weights = algorithm.optimize(rdd, initialWeights)
>>     val w = weights.toArray
>>     val intercept = w.head
>>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>>   }
>> 
>> 
>> Best Regards,
>> Arunkumar
> 
> 


Re: Can LBFGS be used on streaming data?

Posted by "EcoMotto Inc." <ec...@gmail.com>.
Hello Jeremy,

Thank you for your reply.

When I am running this code on the local machine on a streaming data, it
keeps giving me this error:
*WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost):
java.io.FileNotFoundException:
/tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file
or directory) *

And when I execute the same code on a static data after randomly splitting
it into 5 sets, it gives me a little bit different weights (difference is
in decimals). I am still trying to analyse why would this be happening.
Any inputs, on why would this be happening?

Best Regards,
Arunkumar


On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman <fr...@gmail.com>
wrote:

> Hi Arunkumar,
>
> That looks like it should work. Logically, it’s similar to the
> implementation used by StreamingLinearRegression and
> StreamingLogisticRegression, see this class:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
>
> which exposes the kind of operation your describing (for any linear
> method).
>
> The nice thing about the gradient-based methods is that they can use
> existing MLLib optimization routines in this fairly direct way. Other
> methods (such as KMeans) require a bit more reengineering.
>
> — Jeremy
>
> -------------------------
> jeremyfreeman.net
> @thefreemanlab
>
> On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ec...@gmail.com> wrote:
>
> Hello,
>
> I am new to spark streaming API.
>
> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on
> streaming data? Currently I am using forecahRDD for parsing through DStream
> and I am generating a model based on each RDD. Am I doing anything
> logically wrong here?
> Thank you.
>
> Sample Code:
>
> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
> var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
> var isFirst = true
> var model = new LinearRegressionModel(null,1.0)
>
> parsedData.foreachRDD{rdd =>
>   if(isFirst) {
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>     isFirst = false
>   }else{
>     var ab = ArrayBuffer[Double]()
>     ab.insert(0, model.intercept)
>     ab.appendAll( model.weights.toArray)
>     print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
>     initialWeights = Vectors.dense(ab.toArray)
>     print("Initial Weights: "+ initialWeights)
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>   }
>
>
>
> Best Regards,
> Arunkumar
>
>
>

Re: Can LBFGS be used on streaming data?

Posted by Jeremy Freeman <fr...@gmail.com>.
Hi Arunkumar,

That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala

which exposes the kind of operation your describing (for any linear method).

The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering.

— Jeremy

-------------------------
jeremyfreeman.net
@thefreemanlab

On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ec...@gmail.com> wrote:

> Hello,
> 
> I am new to spark streaming API.
> 
> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here?
> Thank you.
> 
> Sample Code:
> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
> var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
> var isFirst = true
> var model = new LinearRegressionModel(null,1.0)
> 
> parsedData.foreachRDD{rdd =>
>   if(isFirst) {
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>     isFirst = false
>   }else{
>     var ab = ArrayBuffer[Double]()
>     ab.insert(0, model.intercept)
>     ab.appendAll( model.weights.toArray)
>     print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
>     initialWeights = Vectors.dense(ab.toArray)
>     print("Initial Weights: "+ initialWeights)
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>   }
> 
> 
> Best Regards,
> Arunkumar


Re: Can LBFGS be used on streaming data?

Posted by "EcoMotto Inc." <ec...@gmail.com>.
Hello DB,

Thank you! Do you know how to run Linear Regression without SGD on
streaming data in spark? I tried SGD but due to step size I was not getting
the expected weights.

Best Regards,
Arunkumar

On Wed, Mar 25, 2015 at 4:33 PM, DB Tsai <db...@dbtsai.com> wrote:

> Hi Arunkumar,
>
> I think L-BFGS will not work since L-BFGS algorithm assumes that the
> objective function will be always the same (i.e., the data is the
> same) for entire optimization process to construct the approximated
> Hessian matrix. In the streaming case, the data will be changing, so
> it will cause problem for the algorithm.
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> Blog: https://www.dbtsai.com
>
>
> On Mon, Mar 16, 2015 at 3:19 PM, EcoMotto Inc. <ec...@gmail.com>
> wrote:
> > Hello,
> >
> > I am new to spark streaming API.
> >
> > I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on
> > streaming data? Currently I am using forecahRDD for parsing through
> DStream
> > and I am generating a model based on each RDD. Am I doing anything
> logically
> > wrong here?
> > Thank you.
> >
> > Sample Code:
> >
> > val algorithm = new LBFGS(new LeastSquaresGradient(), new
> SimpleUpdater())
> > var initialWeights =
> > Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
> > var isFirst = true
> > var model = new LinearRegressionModel(null,1.0)
> >
> > parsedData.foreachRDD{rdd =>
> >   if(isFirst) {
> >     val weights = algorithm.optimize(rdd, initialWeights)
> >     val w = weights.toArray
> >     val intercept = w.head
> >     model = new LinearRegressionModel(Vectors.dense(w.drop(1)),
> intercept)
> >     isFirst = false
> >   }else{
> >     var ab = ArrayBuffer[Double]()
> >     ab.insert(0, model.intercept)
> >     ab.appendAll( model.weights.toArray)
> >     print("Intercept = "+model.intercept+" :: modelWeights =
> > "+model.weights)
> >     initialWeights = Vectors.dense(ab.toArray)
> >     print("Initial Weights: "+ initialWeights)
> >     val weights = algorithm.optimize(rdd, initialWeights)
> >     val w = weights.toArray
> >     val intercept = w.head
> >     model = new LinearRegressionModel(Vectors.dense(w.drop(1)),
> intercept)
> >   }
> >
> >
> >
> > Best Regards,
> > Arunkumar
>

Re: Can LBFGS be used on streaming data?

Posted by DB Tsai <db...@dbtsai.com>.
Hi Arunkumar,

I think L-BFGS will not work since L-BFGS algorithm assumes that the
objective function will be always the same (i.e., the data is the
same) for entire optimization process to construct the approximated
Hessian matrix. In the streaming case, the data will be changing, so
it will cause problem for the algorithm.

Sincerely,

DB Tsai
-------------------------------------------------------
Blog: https://www.dbtsai.com


On Mon, Mar 16, 2015 at 3:19 PM, EcoMotto Inc. <ec...@gmail.com> wrote:
> Hello,
>
> I am new to spark streaming API.
>
> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on
> streaming data? Currently I am using forecahRDD for parsing through DStream
> and I am generating a model based on each RDD. Am I doing anything logically
> wrong here?
> Thank you.
>
> Sample Code:
>
> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
> var initialWeights =
> Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
> var isFirst = true
> var model = new LinearRegressionModel(null,1.0)
>
> parsedData.foreachRDD{rdd =>
>   if(isFirst) {
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>     isFirst = false
>   }else{
>     var ab = ArrayBuffer[Double]()
>     ab.insert(0, model.intercept)
>     ab.appendAll( model.weights.toArray)
>     print("Intercept = "+model.intercept+" :: modelWeights =
> "+model.weights)
>     initialWeights = Vectors.dense(ab.toArray)
>     print("Initial Weights: "+ initialWeights)
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>   }
>
>
>
> Best Regards,
> Arunkumar

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org