You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Bui, Tri" <Tr...@VerizonWireless.com.INVALID> on 2014/11/10 22:03:36 UTC

streaming linear regression is not building the model

Hi,

The model weight is not updating for streaming linear regression.  The code and data below is what I am running.

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val conf = new SparkConf().setMaster("local[1]").setAppName("1feature")
val ssc = new StreamingContext(conf, Seconds(25))
val trainingData = ssc.textFileStream("file:///data/TrainStreamDir").map(LabeledPoint.parse)
val testData = ssc.textFileStream("file:///data/TestStreamDir").map(LabeledPoint.parse)
val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()

sample Data in the TrainStreamDir:

(10240,[1,21,0])
(9936,[2,21,15])
(10118,[3,21,30])
(10174,[4,21,45])
(10460,[5,22,0])
(9961,[6,22,15])
(10372,[7,22,30])
(10666,[8,22,45])
(10300,[9,23,0])

Sample of output results:
14/11/10 15:52:55 INFO scheduler.JobScheduler: Added jobs for time 1415652775000 ms
14/11/10 15:52:55 INFO scheduler.JobScheduler: Starting job streaming job 1415652775000 ms.0 from job set of time 141565
2775000 ms
14/11/10 15:52:55 INFO spark.SparkContext: Starting job: count at GradientDescent.scala:162
14/11/10 15:52:55 INFO spark.SparkContext: Job finished: count at GradientDescent.scala:162, took 3.1689E-5 s
14/11/10 15:52:55 INFO optimization.GradientDescent: GradientDescent.runMiniBatchSGD returning initial weights, no data
found
14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Model updated at time 1415652775000 ms
14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Current model: weights, [0.0,0.0,0.0]

Thanks
Tri


Re: streaming linear regression is not building the model

Posted by tsu-wt <wa...@yahoo.com>.
I am having the same issue, and it still does not update for me. I am trying
to execute the example by using bin/run-example



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-linear-regression-is-not-building-the-model-tp18522p20727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: streaming linear regression is not building the model

Posted by Yanbo Liang <ya...@gmail.com>.
Computing will be triggered by new files added in the directory.
If you place new files to the directory and it will start training the
model.

2014-11-11 5:03 GMT+08:00 Bui, Tri <Tr...@verizonwireless.com.invalid>:

> Hi,
>
>
>
> The model weight is not updating for streaming linear regression.  The
> code and data below is what I am running.
>
>
>
> import org.apache.spark.mllib.linalg.Vectors
>
> import org.apache.spark.mllib.regression.LabeledPoint
>
> import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
>
> import org.apache.spark._
>
> import org.apache.spark.streaming._
>
> import org.apache.spark.streaming.StreamingContext._
>
>
>
> val conf = new SparkConf().setMaster("local[1]").setAppName("1feature")
>
> val ssc = new StreamingContext(conf, Seconds(25))
>
> val trainingData =
> ssc.textFileStream("file:///data/TrainStreamDir").map(LabeledPoint.parse)
>
> val testData =
> ssc.textFileStream("file:///data/TestStreamDir").map(LabeledPoint.parse)
>
> val numFeatures = 3
>
> val model = new
> StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
>
> model.trainOn(trainingData)
>
> model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
>
> ssc.start()
>
> ssc.awaitTermination()
>
>
>
> *sample Data in the TrainStreamDir:*
>
>
>
> (10240,[1,21,0])
>
> (9936,[2,21,15])
>
> (10118,[3,21,30])
>
> (10174,[4,21,45])
>
> (10460,[5,22,0])
>
> (9961,[6,22,15])
>
> (10372,[7,22,30])
>
> (10666,[8,22,45])
>
> (10300,[9,23,0])
>
>
>
> *Sample of output results:*
>
> 14/11/10 15:52:55 INFO scheduler.JobScheduler: Added jobs for time
> 1415652775000 ms
>
> 14/11/10 15:52:55 INFO scheduler.JobScheduler: Starting job streaming job
> 1415652775000 ms.0 from job set of time 141565
>
> 2775000 ms
>
> 14/11/10 15:52:55 INFO spark.SparkContext: Starting job: count at
> GradientDescent.scala:162
>
> 14/11/10 15:52:55 INFO spark.SparkContext: Job finished: count at
> GradientDescent.scala:162, took 3.1689E-5 s
>
> 14/11/10 15:52:55 INFO optimization.GradientDescent:
> GradientDescent.runMiniBatchSGD returning initial weights, no data
>
> found
>
> 14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Model
> updated at time 1415652775000 ms
>
> 14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD:
> Current model: weights, [0.0,0.0,0.0]
>
>
>
> Thanks
>
> Tri
>
>
>