You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Margus Roo <ma...@roo.ee> on 2015/03/14 08:05:28 UTC

Streaming linear regression example question

Hi

I try to understand example provided in 
https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - 
Streaming linear regression

Code:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream

object StreamingLinReg {

   def main(args: Array[String]) {

     val conf = new 
SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
     val ssc = new StreamingContext(conf, Seconds(10))


     val trainingData = 
ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()

     val testData = 
ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)

     val numFeatures = 3
     val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))

     model.trainOn(trainingData)
     model.predictOnValues(testData.map(lp => (lp.label, 
lp.features))).print()

     ssc.start()
     ssc.awaitTermination()

   }

}

Compiled code and run it
Put file contains
   (1.0,[2.0,2.0,2.0])
   (2.0,[3.0,3.0,3.0])
   (3.0,[4.0,4.0,4.0])
   (4.0,[5.0,5.0,5.0])
   (5.0,[6.0,6.0,6.0])
   (6.0,[7.0,7.0,7.0])
   (7.0,[8.0,8.0,8.0])
   (8.0,[9.0,9.0,9.0])
   (9.0,[10.0,10.0,10.0])
in to training directory.

I can see that models weight change:
15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: 
weights, [7.333333333333333,7.333333333333333,7.333333333333333]

No I can put what ever in to testing directory but I can not understand 
answer.
In example I can put the same file I used for training in to testing 
directory. File content is
   (1.0,[2.0,2.0,2.0])
   (2.0,[3.0,3.0,3.0])
   (3.0,[4.0,4.0,4.0])
   (4.0,[5.0,5.0,5.0])
   (5.0,[6.0,6.0,6.0])
   (6.0,[7.0,7.0,7.0])
   (7.0,[8.0,8.0,8.0])
   (8.0,[9.0,9.0,9.0])
   (9.0,[10.0,10.0,10.0])

And answer will be
(1.0,0.0)
(2.0,0.0)
(3.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)

And in case my file content is
   (0.0,[2.0,2.0,2.0])
   (0.0,[3.0,3.0,3.0])
   (0.0,[4.0,4.0,4.0])
   (0.0,[5.0,5.0,5.0])
   (0.0,[6.0,6.0,6.0])
   (0.0,[7.0,7.0,7.0])
   (0.0,[8.0,8.0,8.0])
   (0.0,[9.0,9.0,9.0])
   (0.0,[10.0,10.0,10.0])

the answer will be:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)

I except to get label predicted by model.

-- 
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480


Re: Streaming linear regression example question

Posted by Margus Roo <ma...@roo.ee>.
Tnx for the workaround.

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 16/03/15 06:20, Jeremy Freeman wrote:
> Hi Margus, thanks for reporting this, I’ve been able to reproduce and 
> there does indeed appear to be a bug. I’ve created a JIRA and have a 
> fix ready, can hopefully include in 1.3.1.


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Streaming linear regression example question

Posted by Jeremy Freeman <fr...@gmail.com>.
Hi Margus, thanks for reporting this, I’ve been able to reproduce and there does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can hopefully include in 1.3.1.

In the meantime, you can get the desired result using transform:

> model.trainOn(trainingData)
> 
> testingData.transform { rdd =>
>       val latest = model.latestModel()
>       rdd.map(lp => (lp.label, latest.predict(lp.features)))
> }.print()

-------------------------
jeremyfreeman.net
@thefreemanlab

On Mar 15, 2015, at 2:56 PM, Margus Roo <ma...@roo.ee> wrote:

> Hi again
> 
> Tried the same examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala from 1.3.0
> and getting in case testing file content is:
>   (0.0,[3.0,4.0,3.0])
>   (0.0,[4.0,4.0,4.0])
>   (4.0,[5.0,5.0,5.0])
>   (5.0,[5.0,6.0,6.0])
>   (6.0,[7.0,4.0,7.0])
>   (7.0,[8.0,6.0,8.0])
>   (8.0,[44.0,9.0,9.0])
>   (9.0,[14.0,30.0,10.0])
> 
> and the answer:
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (4.0,0.0)
> (5.0,0.0)
> (6.0,0.0)
> (7.0,0.0)
> (8.0,0.0)
> (9.0,0.0)
> 
> What is wrong?
> I can see that model's weights are changing in case I put new data into training dir.
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> +372 51 480
> On 14/03/15 09:05, Margus Roo wrote:
>> Hi
>> 
>> I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression
>> 
>> Code:
>> import org.apache.spark._
>> import org.apache.spark.streaming._
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.regression.LabeledPoint
>> import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.streaming.dstream.DStream
>> 
>> object StreamingLinReg {
>> 
>>   def main(args: Array[String]) {
>>   
>>     val conf = new SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
>>     val ssc = new StreamingContext(conf, Seconds(10))
>>   
>>     
>>     val trainingData = ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()
>>     
>>     val testData = ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)
>>     
>>     val numFeatures = 3
>>     val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
>>     
>>     model.trainOn(trainingData)
>>     model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
>> 
>>     ssc.start()
>>     ssc.awaitTermination()
>>     
>>   }
>>     
>> }
>> 
>> Compiled code and run it
>> Put file contains
>>   (1.0,[2.0,2.0,2.0])
>>   (2.0,[3.0,3.0,3.0])
>>   (3.0,[4.0,4.0,4.0])
>>   (4.0,[5.0,5.0,5.0])
>>   (5.0,[6.0,6.0,6.0])
>>   (6.0,[7.0,7.0,7.0])
>>   (7.0,[8.0,8.0,8.0])
>>   (8.0,[9.0,9.0,9.0])
>>   (9.0,[10.0,10.0,10.0])
>> in to training directory.
>> 
>> I can see that models weight change:
>> 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333333333333333,7.333333333333333,7.333333333333333]
>> 
>> No I can put what ever in to testing directory but I can not understand answer.
>> In example I can put the same file I used for training in to testing directory. File content is
>>   (1.0,[2.0,2.0,2.0])
>>   (2.0,[3.0,3.0,3.0])
>>   (3.0,[4.0,4.0,4.0])
>>   (4.0,[5.0,5.0,5.0])
>>   (5.0,[6.0,6.0,6.0])
>>   (6.0,[7.0,7.0,7.0])
>>   (7.0,[8.0,8.0,8.0])
>>   (8.0,[9.0,9.0,9.0])
>>   (9.0,[10.0,10.0,10.0])
>> 
>> And answer will be
>> (1.0,0.0)
>> (2.0,0.0)
>> (3.0,0.0)
>> (4.0,0.0)
>> (5.0,0.0)
>> (6.0,0.0)
>> (7.0,0.0)
>> (8.0,0.0)
>> (9.0,0.0)
>> 
>> And in case my file content is
>>   (0.0,[2.0,2.0,2.0])
>>   (0.0,[3.0,3.0,3.0])
>>   (0.0,[4.0,4.0,4.0])
>>   (0.0,[5.0,5.0,5.0])
>>   (0.0,[6.0,6.0,6.0])
>>   (0.0,[7.0,7.0,7.0])
>>   (0.0,[8.0,8.0,8.0])
>>   (0.0,[9.0,9.0,9.0])
>>   (0.0,[10.0,10.0,10.0])
>> 
>> the answer will be:
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> 
>> I except to get label predicted by model.
>> -- 
>> Margus (margusja) Roo
>> http://margus.roo.ee
>> skype: margusja
>> +372 51 480
> 


Re: Streaming linear regression example question

Posted by Margus Roo <ma...@roo.ee>.
Hi again

Tried the same 
examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala 
from 1.3.0
and getting in case testing file content is:
   (0.0,[3.0,4.0,3.0])
   (0.0,[4.0,4.0,4.0])
   (4.0,[5.0,5.0,5.0])
   (5.0,[5.0,6.0,6.0])
   (6.0,[7.0,4.0,7.0])
   (7.0,[8.0,6.0,8.0])
   (8.0,[44.0,9.0,9.0])
   (9.0,[14.0,30.0,10.0])

and the answer:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)

What is wrong?
I can see that model's weights are changing in case I put new data into 
training dir.

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 14/03/15 09:05, Margus Roo wrote:
> Hi
>
> I try to understand example provided in 
> https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - 
> Streaming linear regression
>
> Code:
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.mllib.linalg.Vectors
> import org.apache.spark.mllib.regression.LabeledPoint
> import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.dstream.DStream
>
> object StreamingLinReg {
>
>   def main(args: Array[String]) {
>
>     val conf = new 
> SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
>     val ssc = new StreamingContext(conf, Seconds(10))
>
>
>     val trainingData = 
> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()
>
>     val testData = 
> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)
>
>     val numFeatures = 3
>     val model = new 
> StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
>
>     model.trainOn(trainingData)
>     model.predictOnValues(testData.map(lp => (lp.label, 
> lp.features))).print()
>
>     ssc.start()
>     ssc.awaitTermination()
>
>   }
>
> }
>
> Compiled code and run it
> Put file contains
>   (1.0,[2.0,2.0,2.0])
>   (2.0,[3.0,3.0,3.0])
>   (3.0,[4.0,4.0,4.0])
>   (4.0,[5.0,5.0,5.0])
>   (5.0,[6.0,6.0,6.0])
>   (6.0,[7.0,7.0,7.0])
>   (7.0,[8.0,8.0,8.0])
>   (8.0,[9.0,9.0,9.0])
>   (9.0,[10.0,10.0,10.0])
> in to training directory.
>
> I can see that models weight change:
> 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current 
> model: weights, [7.333333333333333,7.333333333333333,7.333333333333333]
>
> No I can put what ever in to testing directory but I can not 
> understand answer.
> In example I can put the same file I used for training in to testing 
> directory. File content is
>   (1.0,[2.0,2.0,2.0])
>   (2.0,[3.0,3.0,3.0])
>   (3.0,[4.0,4.0,4.0])
>   (4.0,[5.0,5.0,5.0])
>   (5.0,[6.0,6.0,6.0])
>   (6.0,[7.0,7.0,7.0])
>   (7.0,[8.0,8.0,8.0])
>   (8.0,[9.0,9.0,9.0])
>   (9.0,[10.0,10.0,10.0])
>
> And answer will be
> (1.0,0.0)
> (2.0,0.0)
> (3.0,0.0)
> (4.0,0.0)
> (5.0,0.0)
> (6.0,0.0)
> (7.0,0.0)
> (8.0,0.0)
> (9.0,0.0)
>
> And in case my file content is
>   (0.0,[2.0,2.0,2.0])
>   (0.0,[3.0,3.0,3.0])
>   (0.0,[4.0,4.0,4.0])
>   (0.0,[5.0,5.0,5.0])
>   (0.0,[6.0,6.0,6.0])
>   (0.0,[7.0,7.0,7.0])
>   (0.0,[8.0,8.0,8.0])
>   (0.0,[9.0,9.0,9.0])
>   (0.0,[10.0,10.0,10.0])
>
> the answer will be:
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
>
> I except to get label predicted by model.
> -- 
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> +372 51 480