You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shing Hing Man <ma...@yahoo.com.INVALID> on 2017/11/18 15:25:10 UTC

Spark 2.1.2 Spark Streaming checkpoint interval not respected

Hi, 
In the following example using mapWithState, I set checkpoint interval to 1 minute. From the log, Spark stills write to the checkpoint directory every second. Would be appreciated if someone can point out what I have done wrong. 
object MapWithStateDemo {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: MapWithStateDemo <hostname> <port>")
      System.exit(1)
    }


    val sparkConf = new SparkConf().setAppName("MapWithStateDemo")
      .setIfMissing("spark.master","local[*]")

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))


    // Initial state RDD for mapWithState operation
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

    // Create a ReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using mapWithState
    // This will give a DStream made of state (which is the cumulative count of the words)
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      val output = (word, sum)
      state.update(sum)
      output
    }

    val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] =
      wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD))


    stateDstream.checkpoint(Minutes(1L))
    stateDstream.print()



    val targetDir = new File(getClass.getResource("/").toURI).getParentFile.getParentFile
    val checkpointDir = targetDir + "/checkpoint"
    ssc.checkpoint(checkpointDir)
    ssc.start()
    ssc.awaitTermination()
  }
}
Thanks in advance for any assistance !
Shing