You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ravi Reddy <ra...@gmail.com> on 2015/03/25 18:09:13 UTC
Recovered state for updateStateByKey and incremental streams
processing
I want to use the "restore from checkpoint" to continue from last accumulated
word counts and process new streams of data. This recovery process will keep
accurate state of accumulated counters state (calculated by
updateStateByKey) after "failure/recovery" or "temp shutdown/upgrade to new
code".
However, the recomendation seem to indicate you have to delete the
checkpoint data if you upgrade to new code. How would this work if I change
the word count accumulation logic and still want to continue to work from
last remembered state?. An example would be that the word counters could be
weighted in a logic that is used for streams that are coming from a
point-in-time later.
This is an example but there are quite a few scenarios where one needs to
continue from previous state as rememberd by "updateStateByKey" and apply
new logic.
---- code snippets below ------------
As in example "RecoverableNetworkWordCount" we should build
"setupInputStreamAndProcessWordCounts" in the context of retrieved
checkpoint only. If "setupInputStreamAndProcessWordCounts" called outside
the "createContext", you will get error "[Receiver-0-1427260249292] is not
unique!".
def createContext(checkPointDir: String, host: String, port : Int) = {
// If you do not see this printed, that means the StreamingContext has
been loaded
// from the new checkpoint
println("Creating new context")
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(checkPointDir)
setupInputStreamAndProcessWordCounts(ssc, host, port)
ssc
}
and invoke in main as
def main(args: Array[String]) {
val checkPointDir = "./saved_state"
if (args.length < 2) {
System.err.println("Usage: SavedNetworkWordCount <hostname> <port>")
System.exit(1)
}
val ssc = StreamingContext.getOrCreate(checkPointDir, () => {
createContext(checkPointDir,args(0), args(1).toInt)
})
//setupInputStreamAndProcessWordCounts(ssc, args(0), args(1).toInt)
ssc.start()
ssc.awaitTermination()
}
}
def setupInputStreamAndProcessWordCounts(ssc: StreamingContext, hostname:
String, port: Int) {
// InputDStream has to be created inside createContext, else you get an
error
val lines = ssc.socketTextStream(hostname, port)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
// Update and print the cumulative count using updateStateByKey
val countsDstream = wordDstream.updateStateByKey[Int](updateFunc)
countsDstream.print() // print or save to external system
}
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recovered-state-for-updateStateByKey-and-incremental-streams-processing-tp22229.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Recovered state for updateStateByKey and incremental streams
processing
Posted by manasdebashiskar <po...@gmail.com>.
If you are using spark 1.6 onwards there is a better solution for you.
It is called mapwithState
mapwithState takes a state function and an initial RDD.
1) When you start your program for the first time/OR version changes and new
code can't use the checkpoint, the initialRDD comes handy.
2) For the rest of the occasion(i.e. program re-start after failure, or
regular stop/start for the same version) the checkpoint works for you.
Also, mapwithstate is easier to reason about then updatestatebykey and is
optimized to handle larger amount of data for the same amount of memory.
I use the same mechanism in production to great success.
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recovered-state-for-updateStateByKey-and-incremental-streams-processing-tp22229p27747.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org