You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ravi Reddy <ra...@gmail.com> on 2015/03/25 18:09:13 UTC

Recovered state for updateStateByKey and incremental streams processing

I want to use the "restore from checkpoint" to continue from last accumulated
word counts and process new streams of data. This recovery process will keep
accurate state of accumulated counters state (calculated by
updateStateByKey) after "failure/recovery" or "temp shutdown/upgrade to new
code".

However, the recomendation seem to indicate you have to delete the
checkpoint data if you upgrade to new code. How would this work if I change
the word count accumulation logic and still want to continue to work from
last remembered state?. An example would be that the word counters could be
weighted in a logic that is used for streams that are coming from a
point-in-time later.

This is an example but there are quite a few scenarios where one needs to
continue from previous state as rememberd by "updateStateByKey" and apply
new logic. 

---- code snippets below ------------

As in example "RecoverableNetworkWordCount" we should build
"setupInputStreamAndProcessWordCounts" in the context of retrieved
checkpoint only. If "setupInputStreamAndProcessWordCounts" called outside
the "createContext", you will get error "[Receiver-0-1427260249292] is not
unique!".

  def createContext(checkPointDir: String, host: String, port : Int) = {
    // If you do not see this printed, that means the StreamingContext has
been loaded
    // from the new checkpoint
    println("Creating new context")
    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(checkPointDir)
    
    setupInputStreamAndProcessWordCounts(ssc, host, port)
    
    ssc
  }

and invoke in main as

  def main(args: Array[String]) {
    val checkPointDir = "./saved_state"

    if (args.length < 2) {
      System.err.println("Usage: SavedNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    val ssc = StreamingContext.getOrCreate(checkPointDir, () => {
        createContext(checkPointDir,args(0), args(1).toInt)
      })
    
    //setupInputStreamAndProcessWordCounts(ssc, args(0), args(1).toInt)

    ssc.start()
    ssc.awaitTermination()
  }
 }

  def setupInputStreamAndProcessWordCounts(ssc: StreamingContext, hostname:
String, port: Int) {
    // InputDStream has to be created inside createContext, else you get an
error
    val lines = ssc.socketTextStream(hostname, port)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
        
    // Update and print the cumulative count using updateStateByKey
    val countsDstream = wordDstream.updateStateByKey[Int](updateFunc)
    countsDstream.print() // print or save to external system
  }
  



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recovered-state-for-updateStateByKey-and-incremental-streams-processing-tp22229.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Recovered state for updateStateByKey and incremental streams processing

Posted by manasdebashiskar <po...@gmail.com>.
If you are using spark 1.6 onwards there is a better solution for you.
It is called mapwithState

mapwithState takes a state function and an initial RDD.

1) When you start your program for the first time/OR version changes and new
code can't use the checkpoint, the initialRDD comes handy.
2) For the rest of the occasion(i.e. program re-start after failure, or
regular stop/start for the same version) the checkpoint works for you.

Also, mapwithstate is easier to reason about then updatestatebykey and is
optimized to handle larger amount of data for the same amount of memory.

I use the same mechanism in production to great success.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recovered-state-for-updateStateByKey-and-incremental-streams-processing-tp22229p27747.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org