You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Margus Roo <ma...@roo.ee> on 2015/03/14 08:05:28 UTC
Streaming linear regression example question
Hi
I try to understand example provided in
https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html -
Streaming linear regression
Code:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
object StreamingLinReg {
def main(args: Array[String]) {
val conf = new
SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val trainingData =
ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()
val testData =
ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)
val numFeatures = 3
val model = new
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label,
lp.features))).print()
ssc.start()
ssc.awaitTermination()
}
}
Compiled code and run it
Put file contains
(1.0,[2.0,2.0,2.0])
(2.0,[3.0,3.0,3.0])
(3.0,[4.0,4.0,4.0])
(4.0,[5.0,5.0,5.0])
(5.0,[6.0,6.0,6.0])
(6.0,[7.0,7.0,7.0])
(7.0,[8.0,8.0,8.0])
(8.0,[9.0,9.0,9.0])
(9.0,[10.0,10.0,10.0])
in to training directory.
I can see that models weight change:
15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model:
weights, [7.333333333333333,7.333333333333333,7.333333333333333]
No I can put what ever in to testing directory but I can not understand
answer.
In example I can put the same file I used for training in to testing
directory. File content is
(1.0,[2.0,2.0,2.0])
(2.0,[3.0,3.0,3.0])
(3.0,[4.0,4.0,4.0])
(4.0,[5.0,5.0,5.0])
(5.0,[6.0,6.0,6.0])
(6.0,[7.0,7.0,7.0])
(7.0,[8.0,8.0,8.0])
(8.0,[9.0,9.0,9.0])
(9.0,[10.0,10.0,10.0])
And answer will be
(1.0,0.0)
(2.0,0.0)
(3.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)
And in case my file content is
(0.0,[2.0,2.0,2.0])
(0.0,[3.0,3.0,3.0])
(0.0,[4.0,4.0,4.0])
(0.0,[5.0,5.0,5.0])
(0.0,[6.0,6.0,6.0])
(0.0,[7.0,7.0,7.0])
(0.0,[8.0,8.0,8.0])
(0.0,[9.0,9.0,9.0])
(0.0,[10.0,10.0,10.0])
the answer will be:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
I except to get label predicted by model.
--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480
Re: Streaming linear regression example question
Posted by Margus Roo <ma...@roo.ee>.
Tnx for the workaround.
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480
On 16/03/15 06:20, Jeremy Freeman wrote:
> Hi Margus, thanks for reporting this, I’ve been able to reproduce and
> there does indeed appear to be a bug. I’ve created a JIRA and have a
> fix ready, can hopefully include in 1.3.1.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Streaming linear regression example question
Posted by Jeremy Freeman <fr...@gmail.com>.
Hi Margus, thanks for reporting this, I’ve been able to reproduce and there does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can hopefully include in 1.3.1.
In the meantime, you can get the desired result using transform:
> model.trainOn(trainingData)
>
> testingData.transform { rdd =>
> val latest = model.latestModel()
> rdd.map(lp => (lp.label, latest.predict(lp.features)))
> }.print()
-------------------------
jeremyfreeman.net
@thefreemanlab
On Mar 15, 2015, at 2:56 PM, Margus Roo <ma...@roo.ee> wrote:
> Hi again
>
> Tried the same examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala from 1.3.0
> and getting in case testing file content is:
> (0.0,[3.0,4.0,3.0])
> (0.0,[4.0,4.0,4.0])
> (4.0,[5.0,5.0,5.0])
> (5.0,[5.0,6.0,6.0])
> (6.0,[7.0,4.0,7.0])
> (7.0,[8.0,6.0,8.0])
> (8.0,[44.0,9.0,9.0])
> (9.0,[14.0,30.0,10.0])
>
> and the answer:
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (4.0,0.0)
> (5.0,0.0)
> (6.0,0.0)
> (7.0,0.0)
> (8.0,0.0)
> (9.0,0.0)
>
> What is wrong?
> I can see that model's weights are changing in case I put new data into training dir.
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> +372 51 480
> On 14/03/15 09:05, Margus Roo wrote:
>> Hi
>>
>> I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression
>>
>> Code:
>> import org.apache.spark._
>> import org.apache.spark.streaming._
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.regression.LabeledPoint
>> import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.streaming.dstream.DStream
>>
>> object StreamingLinReg {
>>
>> def main(args: Array[String]) {
>>
>> val conf = new SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
>> val ssc = new StreamingContext(conf, Seconds(10))
>>
>>
>> val trainingData = ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()
>>
>> val testData = ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)
>>
>> val numFeatures = 3
>> val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
>>
>> model.trainOn(trainingData)
>> model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> }
>>
>> }
>>
>> Compiled code and run it
>> Put file contains
>> (1.0,[2.0,2.0,2.0])
>> (2.0,[3.0,3.0,3.0])
>> (3.0,[4.0,4.0,4.0])
>> (4.0,[5.0,5.0,5.0])
>> (5.0,[6.0,6.0,6.0])
>> (6.0,[7.0,7.0,7.0])
>> (7.0,[8.0,8.0,8.0])
>> (8.0,[9.0,9.0,9.0])
>> (9.0,[10.0,10.0,10.0])
>> in to training directory.
>>
>> I can see that models weight change:
>> 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333333333333333,7.333333333333333,7.333333333333333]
>>
>> No I can put what ever in to testing directory but I can not understand answer.
>> In example I can put the same file I used for training in to testing directory. File content is
>> (1.0,[2.0,2.0,2.0])
>> (2.0,[3.0,3.0,3.0])
>> (3.0,[4.0,4.0,4.0])
>> (4.0,[5.0,5.0,5.0])
>> (5.0,[6.0,6.0,6.0])
>> (6.0,[7.0,7.0,7.0])
>> (7.0,[8.0,8.0,8.0])
>> (8.0,[9.0,9.0,9.0])
>> (9.0,[10.0,10.0,10.0])
>>
>> And answer will be
>> (1.0,0.0)
>> (2.0,0.0)
>> (3.0,0.0)
>> (4.0,0.0)
>> (5.0,0.0)
>> (6.0,0.0)
>> (7.0,0.0)
>> (8.0,0.0)
>> (9.0,0.0)
>>
>> And in case my file content is
>> (0.0,[2.0,2.0,2.0])
>> (0.0,[3.0,3.0,3.0])
>> (0.0,[4.0,4.0,4.0])
>> (0.0,[5.0,5.0,5.0])
>> (0.0,[6.0,6.0,6.0])
>> (0.0,[7.0,7.0,7.0])
>> (0.0,[8.0,8.0,8.0])
>> (0.0,[9.0,9.0,9.0])
>> (0.0,[10.0,10.0,10.0])
>>
>> the answer will be:
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>>
>> I except to get label predicted by model.
>> --
>> Margus (margusja) Roo
>> http://margus.roo.ee
>> skype: margusja
>> +372 51 480
>
Re: Streaming linear regression example question
Posted by Margus Roo <ma...@roo.ee>.
Hi again
Tried the same
examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
from 1.3.0
and getting in case testing file content is:
(0.0,[3.0,4.0,3.0])
(0.0,[4.0,4.0,4.0])
(4.0,[5.0,5.0,5.0])
(5.0,[5.0,6.0,6.0])
(6.0,[7.0,4.0,7.0])
(7.0,[8.0,6.0,8.0])
(8.0,[44.0,9.0,9.0])
(9.0,[14.0,30.0,10.0])
and the answer:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)
What is wrong?
I can see that model's weights are changing in case I put new data into
training dir.
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480
On 14/03/15 09:05, Margus Roo wrote:
> Hi
>
> I try to understand example provided in
> https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html -
> Streaming linear regression
>
> Code:
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.mllib.linalg.Vectors
> import org.apache.spark.mllib.regression.LabeledPoint
> import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.dstream.DStream
>
> object StreamingLinReg {
>
> def main(args: Array[String]) {
>
> val conf = new
> SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
> val ssc = new StreamingContext(conf, Seconds(10))
>
>
> val trainingData =
> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()
>
> val testData =
> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)
>
> val numFeatures = 3
> val model = new
> StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
>
> model.trainOn(trainingData)
> model.predictOnValues(testData.map(lp => (lp.label,
> lp.features))).print()
>
> ssc.start()
> ssc.awaitTermination()
>
> }
>
> }
>
> Compiled code and run it
> Put file contains
> (1.0,[2.0,2.0,2.0])
> (2.0,[3.0,3.0,3.0])
> (3.0,[4.0,4.0,4.0])
> (4.0,[5.0,5.0,5.0])
> (5.0,[6.0,6.0,6.0])
> (6.0,[7.0,7.0,7.0])
> (7.0,[8.0,8.0,8.0])
> (8.0,[9.0,9.0,9.0])
> (9.0,[10.0,10.0,10.0])
> in to training directory.
>
> I can see that models weight change:
> 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current
> model: weights, [7.333333333333333,7.333333333333333,7.333333333333333]
>
> No I can put what ever in to testing directory but I can not
> understand answer.
> In example I can put the same file I used for training in to testing
> directory. File content is
> (1.0,[2.0,2.0,2.0])
> (2.0,[3.0,3.0,3.0])
> (3.0,[4.0,4.0,4.0])
> (4.0,[5.0,5.0,5.0])
> (5.0,[6.0,6.0,6.0])
> (6.0,[7.0,7.0,7.0])
> (7.0,[8.0,8.0,8.0])
> (8.0,[9.0,9.0,9.0])
> (9.0,[10.0,10.0,10.0])
>
> And answer will be
> (1.0,0.0)
> (2.0,0.0)
> (3.0,0.0)
> (4.0,0.0)
> (5.0,0.0)
> (6.0,0.0)
> (7.0,0.0)
> (8.0,0.0)
> (9.0,0.0)
>
> And in case my file content is
> (0.0,[2.0,2.0,2.0])
> (0.0,[3.0,3.0,3.0])
> (0.0,[4.0,4.0,4.0])
> (0.0,[5.0,5.0,5.0])
> (0.0,[6.0,6.0,6.0])
> (0.0,[7.0,7.0,7.0])
> (0.0,[8.0,8.0,8.0])
> (0.0,[9.0,9.0,9.0])
> (0.0,[10.0,10.0,10.0])
>
> the answer will be:
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
>
> I except to get label predicted by model.
> --
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> +372 51 480