You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vinti Maheshwari <vi...@gmail.com> on 2016/02/27 21:28:25 UTC
Spark streaming not remembering previous state
Hi All,
I wrote spark streaming program with stateful transformation.
It seems like my spark streaming application is doing computation correctly
with check pointing.
But i terminate my program and i start it again, it's not reading the
previous checkpointing data and staring from the beginning. Is it the
expected behaviour?
Do i need to change anything in my program so that it will remember the
previous data and start computation from there?
Thanks in advance.
For reference my program:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999)
ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
Regards,
~Vinti
Re: Spark streaming not remembering previous state
Posted by Vinti Maheshwari <vi...@gmail.com>.
Thanks much Amit, Sebastian. It worked.
Regards,
~Vinti
On Sat, Feb 27, 2016 at 12:44 PM, Amit Assudani <aa...@impetus.com>
wrote:
> Your context is not being created using checkpoints, use get or create,
>
> From: Vinti Maheshwari <vi...@gmail.com>
> Date: Saturday, February 27, 2016 at 3:28 PM
> To: user <us...@spark.apache.org>
> Subject: Spark streaming not remembering previous state
>
> Hi All,
>
> I wrote spark streaming program with stateful transformation.
> It seems like my spark streaming application is doing computation
> correctly with check pointing.
> But i terminate my program and i start it again, it's not reading the
> previous checkpointing data and staring from the beginning. Is it the
> expected behaviour?
>
> Do i need to change anything in my program so that it will remember the
> previous data and start computation from there?
>
> Thanks in advance.
>
> For reference my program:
>
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
> val ssc = new StreamingContext(sc, Seconds(5))
> val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999)
> ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
> inputStream.print(1)
> val parsedStream = inputStream
> .map(line => {
> val splitLines = line.split(",")
> (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
> })
> import breeze.linalg.{DenseVector => BDV}
> import scala.util.Try
>
> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
> (current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
> prev.map(_ +: current).orElse(Some(current))
> .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
> })
> state.checkpoint(Duration(10000))
> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
> // Start the computation
> ssc.start()
> // Wait for the computation to terminate
> ssc.awaitTermination()
>
> }
> }
>
>
> Regards,
>
> ~Vinti
>
>
> ------------------------------
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>
Re: Spark streaming not remembering previous state
Posted by Amit Assudani <aa...@impetus.com>.
Your context is not being created using checkpoints, use get or create,
From: Vinti Maheshwari <vi...@gmail.com>>
Date: Saturday, February 27, 2016 at 3:28 PM
To: user <us...@spark.apache.org>>
Subject: Spark streaming not remembering previous state
Hi All,
I wrote spark streaming program with stateful transformation.
It seems like my spark streaming application is doing computation correctly with check pointing.
But i terminate my program and i start it again, it's not reading the previous checkpointing data and staring from the beginning. Is it the expected behaviour?
Do i need to change anything in my program so that it will remember the previous data and start computation from there?
Thanks in advance.
For reference my program:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net<http://ttsv-vccp-01.juniper.net>", 9999)
ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir<http://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir>")
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
Regards,
~Vinti
________________________________
NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Re: Spark streaming not remembering previous state
Posted by Sebastian Piu <se...@gmail.com>.
Here:
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
On Sat, 27 Feb 2016, 20:42 Sebastian Piu, <se...@gmail.com> wrote:
> You need to create the streaming context using an existing checkpoint for
> it to work
>
> See sample here
>
> On Sat, 27 Feb 2016, 20:28 Vinti Maheshwari, <vi...@gmail.com> wrote:
>
>> Hi All,
>>
>> I wrote spark streaming program with stateful transformation.
>> It seems like my spark streaming application is doing computation
>> correctly with check pointing.
>> But i terminate my program and i start it again, it's not reading the
>> previous checkpointing data and staring from the beginning. Is it the
>> expected behaviour?
>>
>> Do i need to change anything in my program so that it will remember the
>> previous data and start computation from there?
>>
>> Thanks in advance.
>>
>> For reference my program:
>>
>>
>> def main(args: Array[String]): Unit = {
>> val conf = new SparkConf().setAppName("HBaseStream")
>> val sc = new SparkContext(conf)
>> val ssc = new StreamingContext(sc, Seconds(5))
>> val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999)
>> ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
>> inputStream.print(1)
>> val parsedStream = inputStream
>> .map(line => {
>> val splitLines = line.split(",")
>> (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
>> })
>> import breeze.linalg.{DenseVector => BDV}
>> import scala.util.Try
>>
>> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>> (current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
>> prev.map(_ +: current).orElse(Some(current))
>> .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>> })
>> state.checkpoint(Duration(10000))
>> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>>
>> // Start the computation
>> ssc.start()
>> // Wait for the computation to terminate
>> ssc.awaitTermination()
>>
>> }
>> }
>>
>>
>> Regards,
>>
>> ~Vinti
>>
>>
Re: Spark streaming not remembering previous state
Posted by Sebastian Piu <se...@gmail.com>.
You need to create the streaming context using an existing checkpoint for
it to work
See sample here
On Sat, 27 Feb 2016, 20:28 Vinti Maheshwari, <vi...@gmail.com> wrote:
> Hi All,
>
> I wrote spark streaming program with stateful transformation.
> It seems like my spark streaming application is doing computation
> correctly with check pointing.
> But i terminate my program and i start it again, it's not reading the
> previous checkpointing data and staring from the beginning. Is it the
> expected behaviour?
>
> Do i need to change anything in my program so that it will remember the
> previous data and start computation from there?
>
> Thanks in advance.
>
> For reference my program:
>
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
> val ssc = new StreamingContext(sc, Seconds(5))
> val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999)
> ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
> inputStream.print(1)
> val parsedStream = inputStream
> .map(line => {
> val splitLines = line.split(",")
> (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
> })
> import breeze.linalg.{DenseVector => BDV}
> import scala.util.Try
>
> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
> (current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
> prev.map(_ +: current).orElse(Some(current))
> .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
> })
> state.checkpoint(Duration(10000))
> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
> // Start the computation
> ssc.start()
> // Wait for the computation to terminate
> ssc.awaitTermination()
>
> }
> }
>
>
> Regards,
>
> ~Vinti
>
>