You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Guillermo Ortiz <ko...@gmail.com> on 2015/07/28 10:14:15 UTC

Checkpoints in SparkStreaming

I'm using SparkStreaming and I want to configure checkpoint to manage
fault-tolerance.
I've been reading the documentation. Is it necessary to create and
configure the InputDSStream in the getOrCreate function?

I checked the example in
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
and it looks like it does everything inside of the function. Should I put
all the logic of the application inside on it?? I think that that's not the
way...

If I just create the context I got an error:
Exception in thread "main" org.apache.spark.SparkException:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not
been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)


I'm not pretty good with Scala.. the code that I did
  def functionToCreateContext(): StreamingContext = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("app")
    val ssc = new StreamingContext(sparkConf, Seconds(5))   // new context

    ssc.checkpoint("/tmp/spark/metricsCheckpoint")   // set checkpoint
directory
    ssc
  }


    val ssc = StreamingContext.getOrCreate("/tmp/spark/metricsCheckpoint",
functionToCreateContext _)
    val kafkaParams = Map[String, String]("metadata.broker.list" -> args(0))
    val topics = args(1).split("\\,")
    val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

    directKafkaStream.foreachRDD { rdd => ...

Re: Checkpoints in SparkStreaming

Posted by Cody Koeninger <co...@koeninger.org>.
Yes, you need to follow the documentation.  Configure your stream,
including the transformations made to it, inside the getOrCreate function.

On Tue, Jul 28, 2015 at 3:14 AM, Guillermo Ortiz <ko...@gmail.com>
wrote:

> I'm using SparkStreaming and I want to configure checkpoint to manage
> fault-tolerance.
> I've been reading the documentation. Is it necessary to create and
> configure the InputDSStream in the getOrCreate function?
>
> I checked the example in
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
> and it looks like it does everything inside of the function. Should I put
> all the logic of the application inside on it?? I think that that's not the
> way...
>
> If I just create the context I got an error:
> Exception in thread "main" org.apache.spark.SparkException:
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not
> been initialized
> at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
> at
> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
>
>
> I'm not pretty good with Scala.. the code that I did
>   def functionToCreateContext(): StreamingContext = {
>     val sparkConf = new
> SparkConf().setMaster("local[2]").setAppName("app")
>     val ssc = new StreamingContext(sparkConf, Seconds(5))   // new context
>
>     ssc.checkpoint("/tmp/spark/metricsCheckpoint")   // set checkpoint
> directory
>     ssc
>   }
>
>
>     val ssc = StreamingContext.getOrCreate("/tmp/spark/metricsCheckpoint",
> functionToCreateContext _)
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> args(0))
>     val topics = args(1).split("\\,")
>     val directKafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)
>
>     directKafkaStream.foreachRDD { rdd => ...
>
>