You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by suchenzang <su...@gmail.com> on 2015/08/25 19:53:28 UTC

Spark Streaming Checkpointing Restarts with 0 Event Batches

Hello,

I'm using direct spark streaming (from kafka) with checkpointing, and
everything works well until a restart. When I shut down (^C) the first
streaming job, wait 1 minute, then re-submit, there is somehow a series of 0
event batches that get queued (corresponding to the 1 minute when the job
was down). Eventually, the batches would resume processing, and I would see
that each batch has roughly 2000 events.

I see that at the beginning of the second launch, the checkpoint dirs are
found and "loaded", according to console output.

Is this expected behavior? It seems like I might've configured something
incorrectly, since I would expect with checkpointing that the streaming job
would resume from checkpoint and continue processing from there (without
seeing 0 event batches corresponding to when the job was down).

Also, if I were to wait > 10 minutes or so before re-launching, there would
be so many 0 event batches that the job would hang. Is this merely something
to be "waited out", or should I set up some restart behavior/make a config
change to discard checkpointing if the elapsed time has been too long?

Thanks!

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png> 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Susan Zhang <su...@gmail.com>.
Ah, I was using the UI coupled with the job logs indicating that offsets
were being "processed" even though it corresponded to 0 events. Looks like
I wasn't matching up timestamps correctly: the 0 event batches were
queued/processed when offsets were getting skipped:

15/08/26 11:26:05 INFO storage.BlockManager: Removing RDD 0
15/08/26 11:26:05 INFO kafka.KafkaRDD: Beginning offset 831729964 is the
same as ending offset skipping install-json 1
15/08/26 11:26:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0
non-empty blocks out of 6 blocks
15/08/26 11:26:08 INFO storage.BlockManager: Removing RDD 1

But eventually processing of offset 831729964 would resume:

15/08/26 11:27:18 INFO kafka.KafkaRDD: Computing topic install-json,
partition 1 offsets 831729964 -> 831729976

Lesson learned: will be more focused on reading the job logs properly in
the future.


Thanks for all the help on this!


On Wed, Aug 26, 2015 at 12:16 PM, Cody Koeninger <co...@koeninger.org> wrote:

> I'd be less concerned about what the streaming ui shows than what's
> actually going on with the job.  When you say you were losing messages, how
> were you observing that?  The UI, or actual job output?
>
> The log lines you posted indicate that the checkpoint was restored and
> those offsets were processed; what are the log lines for the following
> KafkaRDD ?
>
>
> On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang <su...@gmail.com> wrote:
>
>> Compared offsets, and it continues from checkpoint loading:
>>
>> 15/08/26 11:24:54 INFO
>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>> KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
>> (install-json,4,825772921,825773536),
>> (install-json,1,831654775,831655076),
>> (install-json,0,1296018451,1296018810),
>> (install-json,2,824785282,824785696), (install-json,3,
>> 811428882,811429181)]
>>
>> 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 0 offsets 1296018451 -> 1296018810
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 4 offsets 825773536 -> 825907428
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 2 offsets 824785696 -> 824889957
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 3 offsets 811429181 -> 811529084
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 1 offsets 831655076 -> 831729964
>> ...
>>
>> But for some reason the streaming UI shows it as computing 0 events.
>>
>> Removing the call to checkpoint does remove the queueing of 0 event
>> batches, since offsets just skip to the latest (checked that the first
>> part.fromOffset in the restarted job is larger than the last
>> part.untilOffset before restart).
>>
>>
>>
>>
>> On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> When the kafka rdd is actually being iterated on the worker, there
>>> should be an info line of the form
>>>
>>>     log.info(s"Computing topic ${part.topic}, partition
>>> ${part.partition} " +
>>>
>>>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>
>>>
>>> You should be able to compare that to log of offsets during checkpoint
>>> loading, to see if they line up.
>>>
>>> Just out of curiosity, does removing the call to checkpoint on the
>>> stream affect anything?
>>>
>>>
>>>
>>> On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang <su...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for the suggestions! I tried the following:
>>>>
>>>> I removed
>>>>
>>>> createOnError = true
>>>>
>>>> And reran the same process to reproduce. Double checked that checkpoint
>>>> is loading:
>>>>
>>>> 15/08/26 10:10:40 INFO
>>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>>>> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
>>>> (install-json,4,825400856,825401058),
>>>> (install-json,1,831453228,831453396),
>>>> (install-json,0,1295759888,1295760378),
>>>> (install-json,2,824443526,824444409), (install-json,3,
>>>> 811222580,811222874)]
>>>> 15/08/26 10:10:40 INFO
>>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>>>> KafkaRDD for time 1440608830000 ms [(install-json,5,825898528,825898791),
>>>> (install-json,4,825401058,825401249),
>>>> (install-json,1,831453396,831453603),
>>>> (install-json,0,1295760378,1295760809),
>>>> (install-json,2,824444409,824445510), (install-json,3,
>>>> 811222874,811223285)]
>>>> ...
>>>>
>>>> And the same issue is appearing as before (with 0 event batches getting
>>>> queued corresponding to dropped messages). Our kafka brokers are on version
>>>> 0.8.2.0, if that makes a difference.
>>>>
>>>> Also as a sanity check, I took out the ZK updates and reran (just in
>>>> case that was somehow causing problems), and that didn't change anything as
>>>> expected.
>>>>
>>>> Furthermore, the 0 event batches seem to take longer to process than
>>>> batches with the regular load of events: processing time for 0 event
>>>> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
>>>> event batches is consistently < 1s. Why would that happen?
>>>>
>>>>
>>>> As for the checkpoint call:
>>>>
>>>> directKStream.checkpoint(checkpointDuration)
>>>>
>>>> was an attempt to set the checkpointing interval (at some multiple of
>>>> the batch interval), whereas StreamingContext.checkpoint seems like it will
>>>> only set the checkpoint directory.
>>>>
>>>>
>>>>
>>>> Thanks for all the help,
>>>>
>>>> Susan
>>>>
>>>>
>>>> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> The first thing that stands out to me is
>>>>> createOnError = true
>>>>>
>>>>> Are you sure the checkpoint is actually loading, as opposed to failing
>>>>> and starting the job anyway?  There should be info lines that look like
>>>>>
>>>>> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
>>>>> Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)
>>>>>
>>>>>
>>>>> You should be able to tell from those whether the offset ranges being
>>>>> loaded from the checkpoint look reasonable.
>>>>>
>>>>> Also, is there a reason you're calling
>>>>>
>>>>> directKStream.checkpoint(checkpointDuration)
>>>>>
>>>>> Just calling checkpoint on the streaming context should be sufficient
>>>>> to save the metadata
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sure thing!
>>>>>>
>>>>>> The main looks like:
>>>>>>
>>>>>>
>>>>>> --------------------------------------------------------------------------------------------------
>>>>>>
>>>>>>
>>>>>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>>>>>>
>>>>>> val kafkaConf = Map(
>>>>>>       "zookeeper.connect" -> zookeeper,
>>>>>>       "group.id" -> options.group,
>>>>>>       "zookeeper.connection.timeout.ms" -> "10000",
>>>>>>       "auto.commit.interval.ms" -> "1000",
>>>>>>       "rebalance.max.retries" -> "25",
>>>>>>       "bootstrap.servers" -> kafkaBrokers
>>>>>>     )
>>>>>>
>>>>>> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>>>>>>       () => {
>>>>>>         createContext(kafkaConf, checkpointDirectory, topic,
>>>>>> numThreads, isProd)
>>>>>>       }, createOnError = true)
>>>>>>
>>>>>> ssc.start()
>>>>>> ssc.awaitTermination()
>>>>>>
>>>>>>
>>>>>>
>>>>>> --------------------------------------------------------------------------------------------------
>>>>>>
>>>>>>
>>>>>> And createContext is defined as:
>>>>>>
>>>>>>
>>>>>>
>>>>>> --------------------------------------------------------------------------------------------------
>>>>>>
>>>>>>
>>>>>> val batchDuration = Seconds(5)
>>>>>> val checkpointDuration = Seconds(20)
>>>>>>
>>>>>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>>>>>>
>>>>>> def createContext(kafkaConf: Map[String, String],
>>>>>>                     checkpointDirectory: String,
>>>>>>                     topic: String,
>>>>>>                     numThreads: Int,
>>>>>>                     isProd: Boolean)
>>>>>>   : StreamingContext = {
>>>>>>
>>>>>>     val sparkConf = new SparkConf().setAppName("***")
>>>>>>     val ssc = new StreamingContext(sparkConf, batchDuration)
>>>>>>     ssc.checkpoint(checkpointDirectory)
>>>>>>
>>>>>>     val topicSet = topic.split(",").toSet
>>>>>>     val groupId = kafkaConf.getOrElse("group.id", "")
>>>>>>
>>>>>>     val directKStream = KafkaUtils.createDirectStream[String, String,
>>>>>> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>>>>>>     directKStream.checkpoint(checkpointDuration)
>>>>>>
>>>>>>     val table = ***
>>>>>>
>>>>>>     directKStream.foreachRDD { rdd =>
>>>>>>       val offsetRanges =
>>>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>>       rdd.flatMap(rec => someFunc(rec))
>>>>>>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>>>>>>         .foreachPartition { partitionRec =>
>>>>>>           val dbWrite = DynamoDBWriter()
>>>>>>           partitionRec.foreach {
>>>>>>             /* Update Dynamo Here */
>>>>>>           }
>>>>>>         }
>>>>>>
>>>>>>       /** Set up ZK Connection **/
>>>>>>       val props = new Properties()
>>>>>>       kafkaConf.foreach(param => props.put(param._1, param._2))
>>>>>>
>>>>>>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>>>>>>
>>>>>>       val consumerConfig = new ConsumerConfig(props)
>>>>>>       assert(!consumerConfig.autoCommitEnable)
>>>>>>
>>>>>>       val zkClient = new ZkClient(consumerConfig.zkConnect,
>>>>>> consumerConfig.zkSessionTimeoutMs,
>>>>>>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>>>>>>
>>>>>>       offsetRanges.foreach { osr =>
>>>>>>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>>>>>>         val zkPath =
>>>>>> s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>>>>>>         ZkUtils.updatePersistentPath(zkClient, zkPath,
>>>>>> osr.untilOffset.toString)
>>>>>>       }
>>>>>>     }
>>>>>>     ssc
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Sounds like something's not set up right... can you post a minimal
>>>>>>> code example that reproduces the issue?
>>>>>>>
>>>>>>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <su...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yeah. All messages are lost while the streaming job was down.
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <
>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> Are you actually losing messages then?
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <suchenzang@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> No; first batch only contains messages received after the second
>>>>>>>>>> job starts (messages come in at a steady rate of about 400/second).
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <
>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Does the first batch after restart contain all the messages
>>>>>>>>>>> received while the job was down?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <
>>>>>>>>>>> suchenzang@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm using direct spark streaming (from kafka) with
>>>>>>>>>>>> checkpointing, and
>>>>>>>>>>>> everything works well until a restart. When I shut down (^C)
>>>>>>>>>>>> the first
>>>>>>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow
>>>>>>>>>>>> a series of 0
>>>>>>>>>>>> event batches that get queued (corresponding to the 1 minute
>>>>>>>>>>>> when the job
>>>>>>>>>>>> was down). Eventually, the batches would resume processing, and
>>>>>>>>>>>> I would see
>>>>>>>>>>>> that each batch has roughly 2000 events.
>>>>>>>>>>>>
>>>>>>>>>>>> I see that at the beginning of the second launch, the
>>>>>>>>>>>> checkpoint dirs are
>>>>>>>>>>>> found and "loaded", according to console output.
>>>>>>>>>>>>
>>>>>>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>>>>>>> something
>>>>>>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>>>>>>> streaming job
>>>>>>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>>>>>>> (without
>>>>>>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>>>>>>>
>>>>>>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>>>>>>> there would
>>>>>>>>>>>> be so many 0 event batches that the job would hang. Is this
>>>>>>>>>>>> merely something
>>>>>>>>>>>> to be "waited out", or should I set up some restart
>>>>>>>>>>>> behavior/make a config
>>>>>>>>>>>> change to discard checkpointing if the elapsed time has been
>>>>>>>>>>>> too long?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> <
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Cody Koeninger <co...@koeninger.org>.
I'd be less concerned about what the streaming ui shows than what's
actually going on with the job.  When you say you were losing messages, how
were you observing that?  The UI, or actual job output?

The log lines you posted indicate that the checkpoint was restored and
those offsets were processed; what are the log lines for the following
KafkaRDD ?


On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang <su...@gmail.com> wrote:

> Compared offsets, and it continues from checkpoint loading:
>
> 15/08/26 11:24:54 INFO
> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
> KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
> (install-json,4,825772921,825773536),
> (install-json,1,831654775,831655076),
> (install-json,0,1296018451,1296018810),
> (install-json,2,824785282,824785696), (install-json,3,
> 811428882,811429181)]
>
> 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 0 offsets 1296018451 -> 1296018810
> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 4 offsets 825773536 -> 825907428
> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 2 offsets 824785696 -> 824889957
> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 3 offsets 811429181 -> 811529084
> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 1 offsets 831655076 -> 831729964
> ...
>
> But for some reason the streaming UI shows it as computing 0 events.
>
> Removing the call to checkpoint does remove the queueing of 0 event
> batches, since offsets just skip to the latest (checked that the first
> part.fromOffset in the restarted job is larger than the last
> part.untilOffset before restart).
>
>
>
>
> On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> When the kafka rdd is actually being iterated on the worker, there should
>> be an info line of the form
>>
>>     log.info(s"Computing topic ${part.topic}, partition
>> ${part.partition} " +
>>
>>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>
>>
>> You should be able to compare that to log of offsets during checkpoint
>> loading, to see if they line up.
>>
>> Just out of curiosity, does removing the call to checkpoint on the stream
>> affect anything?
>>
>>
>>
>> On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang <su...@gmail.com>
>> wrote:
>>
>>> Thanks for the suggestions! I tried the following:
>>>
>>> I removed
>>>
>>> createOnError = true
>>>
>>> And reran the same process to reproduce. Double checked that checkpoint
>>> is loading:
>>>
>>> 15/08/26 10:10:40 INFO
>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>>> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
>>> (install-json,4,825400856,825401058),
>>> (install-json,1,831453228,831453396),
>>> (install-json,0,1295759888,1295760378),
>>> (install-json,2,824443526,824444409), (install-json,3,
>>> 811222580,811222874)]
>>> 15/08/26 10:10:40 INFO
>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>>> KafkaRDD for time 1440608830000 ms [(install-json,5,825898528,825898791),
>>> (install-json,4,825401058,825401249),
>>> (install-json,1,831453396,831453603),
>>> (install-json,0,1295760378,1295760809),
>>> (install-json,2,824444409,824445510), (install-json,3,
>>> 811222874,811223285)]
>>> ...
>>>
>>> And the same issue is appearing as before (with 0 event batches getting
>>> queued corresponding to dropped messages). Our kafka brokers are on version
>>> 0.8.2.0, if that makes a difference.
>>>
>>> Also as a sanity check, I took out the ZK updates and reran (just in
>>> case that was somehow causing problems), and that didn't change anything as
>>> expected.
>>>
>>> Furthermore, the 0 event batches seem to take longer to process than
>>> batches with the regular load of events: processing time for 0 event
>>> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
>>> event batches is consistently < 1s. Why would that happen?
>>>
>>>
>>> As for the checkpoint call:
>>>
>>> directKStream.checkpoint(checkpointDuration)
>>>
>>> was an attempt to set the checkpointing interval (at some multiple of
>>> the batch interval), whereas StreamingContext.checkpoint seems like it will
>>> only set the checkpoint directory.
>>>
>>>
>>>
>>> Thanks for all the help,
>>>
>>> Susan
>>>
>>>
>>> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> The first thing that stands out to me is
>>>> createOnError = true
>>>>
>>>> Are you sure the checkpoint is actually loading, as opposed to failing
>>>> and starting the job anyway?  There should be info lines that look like
>>>>
>>>> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
>>>> Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)
>>>>
>>>>
>>>> You should be able to tell from those whether the offset ranges being
>>>> loaded from the checkpoint look reasonable.
>>>>
>>>> Also, is there a reason you're calling
>>>>
>>>> directKStream.checkpoint(checkpointDuration)
>>>>
>>>> Just calling checkpoint on the streaming context should be sufficient
>>>> to save the metadata
>>>>
>>>>
>>>>
>>>> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sure thing!
>>>>>
>>>>> The main looks like:
>>>>>
>>>>>
>>>>> --------------------------------------------------------------------------------------------------
>>>>>
>>>>>
>>>>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>>>>>
>>>>> val kafkaConf = Map(
>>>>>       "zookeeper.connect" -> zookeeper,
>>>>>       "group.id" -> options.group,
>>>>>       "zookeeper.connection.timeout.ms" -> "10000",
>>>>>       "auto.commit.interval.ms" -> "1000",
>>>>>       "rebalance.max.retries" -> "25",
>>>>>       "bootstrap.servers" -> kafkaBrokers
>>>>>     )
>>>>>
>>>>> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>>>>>       () => {
>>>>>         createContext(kafkaConf, checkpointDirectory, topic,
>>>>> numThreads, isProd)
>>>>>       }, createOnError = true)
>>>>>
>>>>> ssc.start()
>>>>> ssc.awaitTermination()
>>>>>
>>>>>
>>>>>
>>>>> --------------------------------------------------------------------------------------------------
>>>>>
>>>>>
>>>>> And createContext is defined as:
>>>>>
>>>>>
>>>>>
>>>>> --------------------------------------------------------------------------------------------------
>>>>>
>>>>>
>>>>> val batchDuration = Seconds(5)
>>>>> val checkpointDuration = Seconds(20)
>>>>>
>>>>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>>>>>
>>>>> def createContext(kafkaConf: Map[String, String],
>>>>>                     checkpointDirectory: String,
>>>>>                     topic: String,
>>>>>                     numThreads: Int,
>>>>>                     isProd: Boolean)
>>>>>   : StreamingContext = {
>>>>>
>>>>>     val sparkConf = new SparkConf().setAppName("***")
>>>>>     val ssc = new StreamingContext(sparkConf, batchDuration)
>>>>>     ssc.checkpoint(checkpointDirectory)
>>>>>
>>>>>     val topicSet = topic.split(",").toSet
>>>>>     val groupId = kafkaConf.getOrElse("group.id", "")
>>>>>
>>>>>     val directKStream = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>>>>>     directKStream.checkpoint(checkpointDuration)
>>>>>
>>>>>     val table = ***
>>>>>
>>>>>     directKStream.foreachRDD { rdd =>
>>>>>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>       rdd.flatMap(rec => someFunc(rec))
>>>>>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>>>>>         .foreachPartition { partitionRec =>
>>>>>           val dbWrite = DynamoDBWriter()
>>>>>           partitionRec.foreach {
>>>>>             /* Update Dynamo Here */
>>>>>           }
>>>>>         }
>>>>>
>>>>>       /** Set up ZK Connection **/
>>>>>       val props = new Properties()
>>>>>       kafkaConf.foreach(param => props.put(param._1, param._2))
>>>>>
>>>>>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>>>>>
>>>>>       val consumerConfig = new ConsumerConfig(props)
>>>>>       assert(!consumerConfig.autoCommitEnable)
>>>>>
>>>>>       val zkClient = new ZkClient(consumerConfig.zkConnect,
>>>>> consumerConfig.zkSessionTimeoutMs,
>>>>>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>>>>>
>>>>>       offsetRanges.foreach { osr =>
>>>>>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>>>>>         val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>>>>>         ZkUtils.updatePersistentPath(zkClient, zkPath,
>>>>> osr.untilOffset.toString)
>>>>>       }
>>>>>     }
>>>>>     ssc
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Sounds like something's not set up right... can you post a minimal
>>>>>> code example that reproduces the issue?
>>>>>>
>>>>>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <su...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yeah. All messages are lost while the streaming job was down.
>>>>>>>
>>>>>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <cody@koeninger.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Are you actually losing messages then?
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> No; first batch only contains messages received after the second
>>>>>>>>> job starts (messages come in at a steady rate of about 400/second).
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <
>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> Does the first batch after restart contain all the messages
>>>>>>>>>> received while the job was down?
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <
>>>>>>>>>> suchenzang@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> I'm using direct spark streaming (from kafka) with
>>>>>>>>>>> checkpointing, and
>>>>>>>>>>> everything works well until a restart. When I shut down (^C) the
>>>>>>>>>>> first
>>>>>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>>>>>>> series of 0
>>>>>>>>>>> event batches that get queued (corresponding to the 1 minute
>>>>>>>>>>> when the job
>>>>>>>>>>> was down). Eventually, the batches would resume processing, and
>>>>>>>>>>> I would see
>>>>>>>>>>> that each batch has roughly 2000 events.
>>>>>>>>>>>
>>>>>>>>>>> I see that at the beginning of the second launch, the checkpoint
>>>>>>>>>>> dirs are
>>>>>>>>>>> found and "loaded", according to console output.
>>>>>>>>>>>
>>>>>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>>>>>> something
>>>>>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>>>>>> streaming job
>>>>>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>>>>>> (without
>>>>>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>>>>>>
>>>>>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>>>>>> there would
>>>>>>>>>>> be so many 0 event batches that the job would hang. Is this
>>>>>>>>>>> merely something
>>>>>>>>>>> to be "waited out", or should I set up some restart
>>>>>>>>>>> behavior/make a config
>>>>>>>>>>> change to discard checkpointing if the elapsed time has been too
>>>>>>>>>>> long?
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>> <
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Susan Zhang <su...@gmail.com>.
Compared offsets, and it continues from checkpoint loading:

15/08/26 11:24:54 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
(install-json,4,825772921,825773536), (install-json,1,831654775,831655076),
(install-json,0,1296018451,1296018810),
(install-json,2,824785282,824785696), (install-json,3,
811428882,811429181)]

15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
partition 0 offsets 1296018451 -> 1296018810
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 4 offsets 825773536 -> 825907428
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 2 offsets 824785696 -> 824889957
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 3 offsets 811429181 -> 811529084
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 1 offsets 831655076 -> 831729964
...

But for some reason the streaming UI shows it as computing 0 events.

Removing the call to checkpoint does remove the queueing of 0 event
batches, since offsets just skip to the latest (checked that the first
part.fromOffset in the restarted job is larger than the last
part.untilOffset before restart).




On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger <co...@koeninger.org> wrote:

> When the kafka rdd is actually being iterated on the worker, there should
> be an info line of the form
>
>     log.info(s"Computing topic ${part.topic}, partition ${part.partition}
> " +
>
>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>
>
> You should be able to compare that to log of offsets during checkpoint
> loading, to see if they line up.
>
> Just out of curiosity, does removing the call to checkpoint on the stream
> affect anything?
>
>
>
> On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang <su...@gmail.com> wrote:
>
>> Thanks for the suggestions! I tried the following:
>>
>> I removed
>>
>> createOnError = true
>>
>> And reran the same process to reproduce. Double checked that checkpoint
>> is loading:
>>
>> 15/08/26 10:10:40 INFO
>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
>> (install-json,4,825400856,825401058),
>> (install-json,1,831453228,831453396),
>> (install-json,0,1295759888,1295760378),
>> (install-json,2,824443526,824444409), (install-json,3,
>> 811222580,811222874)]
>> 15/08/26 10:10:40 INFO
>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>> KafkaRDD for time 1440608830000 ms [(install-json,5,825898528,825898791),
>> (install-json,4,825401058,825401249),
>> (install-json,1,831453396,831453603),
>> (install-json,0,1295760378,1295760809),
>> (install-json,2,824444409,824445510), (install-json,3,
>> 811222874,811223285)]
>> ...
>>
>> And the same issue is appearing as before (with 0 event batches getting
>> queued corresponding to dropped messages). Our kafka brokers are on version
>> 0.8.2.0, if that makes a difference.
>>
>> Also as a sanity check, I took out the ZK updates and reran (just in case
>> that was somehow causing problems), and that didn't change anything as
>> expected.
>>
>> Furthermore, the 0 event batches seem to take longer to process than
>> batches with the regular load of events: processing time for 0 event
>> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
>> event batches is consistently < 1s. Why would that happen?
>>
>>
>> As for the checkpoint call:
>>
>> directKStream.checkpoint(checkpointDuration)
>>
>> was an attempt to set the checkpointing interval (at some multiple of the
>> batch interval), whereas StreamingContext.checkpoint seems like it will
>> only set the checkpoint directory.
>>
>>
>>
>> Thanks for all the help,
>>
>> Susan
>>
>>
>> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> The first thing that stands out to me is
>>> createOnError = true
>>>
>>> Are you sure the checkpoint is actually loading, as opposed to failing
>>> and starting the job anyway?  There should be info lines that look like
>>>
>>> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
>>> Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)
>>>
>>>
>>> You should be able to tell from those whether the offset ranges being
>>> loaded from the checkpoint look reasonable.
>>>
>>> Also, is there a reason you're calling
>>>
>>> directKStream.checkpoint(checkpointDuration)
>>>
>>> Just calling checkpoint on the streaming context should be sufficient to
>>> save the metadata
>>>
>>>
>>>
>>> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <su...@gmail.com>
>>> wrote:
>>>
>>>> Sure thing!
>>>>
>>>> The main looks like:
>>>>
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>>
>>>>
>>>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>>>>
>>>> val kafkaConf = Map(
>>>>       "zookeeper.connect" -> zookeeper,
>>>>       "group.id" -> options.group,
>>>>       "zookeeper.connection.timeout.ms" -> "10000",
>>>>       "auto.commit.interval.ms" -> "1000",
>>>>       "rebalance.max.retries" -> "25",
>>>>       "bootstrap.servers" -> kafkaBrokers
>>>>     )
>>>>
>>>> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>>>>       () => {
>>>>         createContext(kafkaConf, checkpointDirectory, topic,
>>>> numThreads, isProd)
>>>>       }, createOnError = true)
>>>>
>>>> ssc.start()
>>>> ssc.awaitTermination()
>>>>
>>>>
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>>
>>>>
>>>> And createContext is defined as:
>>>>
>>>>
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>>
>>>>
>>>> val batchDuration = Seconds(5)
>>>> val checkpointDuration = Seconds(20)
>>>>
>>>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>>>>
>>>> def createContext(kafkaConf: Map[String, String],
>>>>                     checkpointDirectory: String,
>>>>                     topic: String,
>>>>                     numThreads: Int,
>>>>                     isProd: Boolean)
>>>>   : StreamingContext = {
>>>>
>>>>     val sparkConf = new SparkConf().setAppName("***")
>>>>     val ssc = new StreamingContext(sparkConf, batchDuration)
>>>>     ssc.checkpoint(checkpointDirectory)
>>>>
>>>>     val topicSet = topic.split(",").toSet
>>>>     val groupId = kafkaConf.getOrElse("group.id", "")
>>>>
>>>>     val directKStream = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>>>>     directKStream.checkpoint(checkpointDuration)
>>>>
>>>>     val table = ***
>>>>
>>>>     directKStream.foreachRDD { rdd =>
>>>>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>       rdd.flatMap(rec => someFunc(rec))
>>>>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>>>>         .foreachPartition { partitionRec =>
>>>>           val dbWrite = DynamoDBWriter()
>>>>           partitionRec.foreach {
>>>>             /* Update Dynamo Here */
>>>>           }
>>>>         }
>>>>
>>>>       /** Set up ZK Connection **/
>>>>       val props = new Properties()
>>>>       kafkaConf.foreach(param => props.put(param._1, param._2))
>>>>
>>>>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>>>>
>>>>       val consumerConfig = new ConsumerConfig(props)
>>>>       assert(!consumerConfig.autoCommitEnable)
>>>>
>>>>       val zkClient = new ZkClient(consumerConfig.zkConnect,
>>>> consumerConfig.zkSessionTimeoutMs,
>>>>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>>>>
>>>>       offsetRanges.foreach { osr =>
>>>>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>>>>         val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>>>>         ZkUtils.updatePersistentPath(zkClient, zkPath,
>>>> osr.untilOffset.toString)
>>>>       }
>>>>     }
>>>>     ssc
>>>>   }
>>>>
>>>>
>>>>
>>>> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Sounds like something's not set up right... can you post a minimal
>>>>> code example that reproduces the issue?
>>>>>
>>>>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yeah. All messages are lost while the streaming job was down.
>>>>>>
>>>>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Are you actually losing messages then?
>>>>>>>
>>>>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> No; first batch only contains messages received after the second
>>>>>>>> job starts (messages come in at a steady rate of about 400/second).
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <
>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> Does the first batch after restart contain all the messages
>>>>>>>>> received while the job was down?
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <suchenzang@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> I'm using direct spark streaming (from kafka) with checkpointing,
>>>>>>>>>> and
>>>>>>>>>> everything works well until a restart. When I shut down (^C) the
>>>>>>>>>> first
>>>>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>>>>>> series of 0
>>>>>>>>>> event batches that get queued (corresponding to the 1 minute when
>>>>>>>>>> the job
>>>>>>>>>> was down). Eventually, the batches would resume processing, and I
>>>>>>>>>> would see
>>>>>>>>>> that each batch has roughly 2000 events.
>>>>>>>>>>
>>>>>>>>>> I see that at the beginning of the second launch, the checkpoint
>>>>>>>>>> dirs are
>>>>>>>>>> found and "loaded", according to console output.
>>>>>>>>>>
>>>>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>>>>> something
>>>>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>>>>> streaming job
>>>>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>>>>> (without
>>>>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>>>>>
>>>>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>>>>> there would
>>>>>>>>>> be so many 0 event batches that the job would hang. Is this
>>>>>>>>>> merely something
>>>>>>>>>> to be "waited out", or should I set up some restart behavior/make
>>>>>>>>>> a config
>>>>>>>>>> change to discard checkpointing if the elapsed time has been too
>>>>>>>>>> long?
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> <
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> View this message in context:
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>> Nabble.com.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Cody Koeninger <co...@koeninger.org>.
When the kafka rdd is actually being iterated on the worker, there should
be an info line of the form

    log.info(s"Computing topic ${part.topic}, partition ${part.partition} "
+

      s"offsets ${part.fromOffset} -> ${part.untilOffset}")


You should be able to compare that to log of offsets during checkpoint
loading, to see if they line up.

Just out of curiosity, does removing the call to checkpoint on the stream
affect anything?



On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang <su...@gmail.com> wrote:

> Thanks for the suggestions! I tried the following:
>
> I removed
>
> createOnError = true
>
> And reran the same process to reproduce. Double checked that checkpoint is
> loading:
>
> 15/08/26 10:10:40 INFO
> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
> (install-json,4,825400856,825401058),
> (install-json,1,831453228,831453396),
> (install-json,0,1295759888,1295760378),
> (install-json,2,824443526,824444409), (install-json,3,
> 811222580,811222874)]
> 15/08/26 10:10:40 INFO
> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
> KafkaRDD for time 1440608830000 ms [(install-json,5,825898528,825898791),
> (install-json,4,825401058,825401249),
> (install-json,1,831453396,831453603),
> (install-json,0,1295760378,1295760809),
> (install-json,2,824444409,824445510), (install-json,3,
> 811222874,811223285)]
> ...
>
> And the same issue is appearing as before (with 0 event batches getting
> queued corresponding to dropped messages). Our kafka brokers are on version
> 0.8.2.0, if that makes a difference.
>
> Also as a sanity check, I took out the ZK updates and reran (just in case
> that was somehow causing problems), and that didn't change anything as
> expected.
>
> Furthermore, the 0 event batches seem to take longer to process than
> batches with the regular load of events: processing time for 0 event
> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
> event batches is consistently < 1s. Why would that happen?
>
>
> As for the checkpoint call:
>
> directKStream.checkpoint(checkpointDuration)
>
> was an attempt to set the checkpointing interval (at some multiple of the
> batch interval), whereas StreamingContext.checkpoint seems like it will
> only set the checkpoint directory.
>
>
>
> Thanks for all the help,
>
> Susan
>
>
> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> The first thing that stands out to me is
>> createOnError = true
>>
>> Are you sure the checkpoint is actually loading, as opposed to failing
>> and starting the job anyway?  There should be info lines that look like
>>
>> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
>> Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)
>>
>>
>> You should be able to tell from those whether the offset ranges being
>> loaded from the checkpoint look reasonable.
>>
>> Also, is there a reason you're calling
>>
>> directKStream.checkpoint(checkpointDuration)
>>
>> Just calling checkpoint on the streaming context should be sufficient to
>> save the metadata
>>
>>
>>
>> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <su...@gmail.com>
>> wrote:
>>
>>> Sure thing!
>>>
>>> The main looks like:
>>>
>>>
>>> --------------------------------------------------------------------------------------------------
>>>
>>>
>>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>>>
>>> val kafkaConf = Map(
>>>       "zookeeper.connect" -> zookeeper,
>>>       "group.id" -> options.group,
>>>       "zookeeper.connection.timeout.ms" -> "10000",
>>>       "auto.commit.interval.ms" -> "1000",
>>>       "rebalance.max.retries" -> "25",
>>>       "bootstrap.servers" -> kafkaBrokers
>>>     )
>>>
>>> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>>>       () => {
>>>         createContext(kafkaConf, checkpointDirectory, topic, numThreads,
>>> isProd)
>>>       }, createOnError = true)
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>>
>>>
>>> --------------------------------------------------------------------------------------------------
>>>
>>>
>>> And createContext is defined as:
>>>
>>>
>>>
>>> --------------------------------------------------------------------------------------------------
>>>
>>>
>>> val batchDuration = Seconds(5)
>>> val checkpointDuration = Seconds(20)
>>>
>>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>>>
>>> def createContext(kafkaConf: Map[String, String],
>>>                     checkpointDirectory: String,
>>>                     topic: String,
>>>                     numThreads: Int,
>>>                     isProd: Boolean)
>>>   : StreamingContext = {
>>>
>>>     val sparkConf = new SparkConf().setAppName("***")
>>>     val ssc = new StreamingContext(sparkConf, batchDuration)
>>>     ssc.checkpoint(checkpointDirectory)
>>>
>>>     val topicSet = topic.split(",").toSet
>>>     val groupId = kafkaConf.getOrElse("group.id", "")
>>>
>>>     val directKStream = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>>>     directKStream.checkpoint(checkpointDuration)
>>>
>>>     val table = ***
>>>
>>>     directKStream.foreachRDD { rdd =>
>>>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>       rdd.flatMap(rec => someFunc(rec))
>>>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>>>         .foreachPartition { partitionRec =>
>>>           val dbWrite = DynamoDBWriter()
>>>           partitionRec.foreach {
>>>             /* Update Dynamo Here */
>>>           }
>>>         }
>>>
>>>       /** Set up ZK Connection **/
>>>       val props = new Properties()
>>>       kafkaConf.foreach(param => props.put(param._1, param._2))
>>>
>>>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>>>
>>>       val consumerConfig = new ConsumerConfig(props)
>>>       assert(!consumerConfig.autoCommitEnable)
>>>
>>>       val zkClient = new ZkClient(consumerConfig.zkConnect,
>>> consumerConfig.zkSessionTimeoutMs,
>>>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>>>
>>>       offsetRanges.foreach { osr =>
>>>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>>>         val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>>>         ZkUtils.updatePersistentPath(zkClient, zkPath,
>>> osr.untilOffset.toString)
>>>       }
>>>     }
>>>     ssc
>>>   }
>>>
>>>
>>>
>>> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Sounds like something's not set up right... can you post a minimal code
>>>> example that reproduces the issue?
>>>>
>>>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yeah. All messages are lost while the streaming job was down.
>>>>>
>>>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Are you actually losing messages then?
>>>>>>
>>>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> No; first batch only contains messages received after the second job
>>>>>>> starts (messages come in at a steady rate of about 400/second).
>>>>>>>
>>>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <cody@koeninger.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Does the first batch after restart contain all the messages
>>>>>>>> received while the job was down?
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I'm using direct spark streaming (from kafka) with checkpointing,
>>>>>>>>> and
>>>>>>>>> everything works well until a restart. When I shut down (^C) the
>>>>>>>>> first
>>>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>>>>> series of 0
>>>>>>>>> event batches that get queued (corresponding to the 1 minute when
>>>>>>>>> the job
>>>>>>>>> was down). Eventually, the batches would resume processing, and I
>>>>>>>>> would see
>>>>>>>>> that each batch has roughly 2000 events.
>>>>>>>>>
>>>>>>>>> I see that at the beginning of the second launch, the checkpoint
>>>>>>>>> dirs are
>>>>>>>>> found and "loaded", according to console output.
>>>>>>>>>
>>>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>>>> something
>>>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>>>> streaming job
>>>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>>>> (without
>>>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>>>>
>>>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>>>> there would
>>>>>>>>> be so many 0 event batches that the job would hang. Is this merely
>>>>>>>>> something
>>>>>>>>> to be "waited out", or should I set up some restart behavior/make
>>>>>>>>> a config
>>>>>>>>> change to discard checkpointing if the elapsed time has been too
>>>>>>>>> long?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> <
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> View this message in context:
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>> Nabble.com.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Susan Zhang <su...@gmail.com>.
Thanks for the suggestions! I tried the following:

I removed

createOnError = true

And reran the same process to reproduce. Double checked that checkpoint is
loading:

15/08/26 10:10:40 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
(install-json,4,825400856,825401058), (install-json,1,831453228,831453396),
(install-json,0,1295759888,1295760378),
(install-json,2,824443526,824444409), (install-json,3,
811222580,811222874)]
15/08/26 10:10:40 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 1440608830000 ms [(install-json,5,825898528,825898791),
(install-json,4,825401058,825401249), (install-json,1,831453396,831453603),
(install-json,0,1295760378,1295760809),
(install-json,2,824444409,824445510), (install-json,3,
811222874,811223285)]
...

And the same issue is appearing as before (with 0 event batches getting
queued corresponding to dropped messages). Our kafka brokers are on version
0.8.2.0, if that makes a difference.

Also as a sanity check, I took out the ZK updates and reran (just in case
that was somehow causing problems), and that didn't change anything as
expected.

Furthermore, the 0 event batches seem to take longer to process than
batches with the regular load of events: processing time for 0 event
batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
event batches is consistently < 1s. Why would that happen?


As for the checkpoint call:

directKStream.checkpoint(checkpointDuration)

was an attempt to set the checkpointing interval (at some multiple of the
batch interval), whereas StreamingContext.checkpoint seems like it will
only set the checkpoint directory.



Thanks for all the help,

Susan


On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger <co...@koeninger.org> wrote:

> The first thing that stands out to me is
> createOnError = true
>
> Are you sure the checkpoint is actually loading, as opposed to failing and
> starting the job anyway?  There should be info lines that look like
>
> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
> Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)
>
>
> You should be able to tell from those whether the offset ranges being
> loaded from the checkpoint look reasonable.
>
> Also, is there a reason you're calling
>
> directKStream.checkpoint(checkpointDuration)
>
> Just calling checkpoint on the streaming context should be sufficient to
> save the metadata
>
>
>
> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <su...@gmail.com> wrote:
>
>> Sure thing!
>>
>> The main looks like:
>>
>>
>> --------------------------------------------------------------------------------------------------
>>
>>
>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>>
>> val kafkaConf = Map(
>>       "zookeeper.connect" -> zookeeper,
>>       "group.id" -> options.group,
>>       "zookeeper.connection.timeout.ms" -> "10000",
>>       "auto.commit.interval.ms" -> "1000",
>>       "rebalance.max.retries" -> "25",
>>       "bootstrap.servers" -> kafkaBrokers
>>     )
>>
>> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>>       () => {
>>         createContext(kafkaConf, checkpointDirectory, topic, numThreads,
>> isProd)
>>       }, createOnError = true)
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>>
>>
>> --------------------------------------------------------------------------------------------------
>>
>>
>> And createContext is defined as:
>>
>>
>>
>> --------------------------------------------------------------------------------------------------
>>
>>
>> val batchDuration = Seconds(5)
>> val checkpointDuration = Seconds(20)
>>
>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>>
>> def createContext(kafkaConf: Map[String, String],
>>                     checkpointDirectory: String,
>>                     topic: String,
>>                     numThreads: Int,
>>                     isProd: Boolean)
>>   : StreamingContext = {
>>
>>     val sparkConf = new SparkConf().setAppName("***")
>>     val ssc = new StreamingContext(sparkConf, batchDuration)
>>     ssc.checkpoint(checkpointDirectory)
>>
>>     val topicSet = topic.split(",").toSet
>>     val groupId = kafkaConf.getOrElse("group.id", "")
>>
>>     val directKStream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>>     directKStream.checkpoint(checkpointDuration)
>>
>>     val table = ***
>>
>>     directKStream.foreachRDD { rdd =>
>>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>       rdd.flatMap(rec => someFunc(rec))
>>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>>         .foreachPartition { partitionRec =>
>>           val dbWrite = DynamoDBWriter()
>>           partitionRec.foreach {
>>             /* Update Dynamo Here */
>>           }
>>         }
>>
>>       /** Set up ZK Connection **/
>>       val props = new Properties()
>>       kafkaConf.foreach(param => props.put(param._1, param._2))
>>
>>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>>
>>       val consumerConfig = new ConsumerConfig(props)
>>       assert(!consumerConfig.autoCommitEnable)
>>
>>       val zkClient = new ZkClient(consumerConfig.zkConnect,
>> consumerConfig.zkSessionTimeoutMs,
>>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>>
>>       offsetRanges.foreach { osr =>
>>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>>         val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>>         ZkUtils.updatePersistentPath(zkClient, zkPath,
>> osr.untilOffset.toString)
>>       }
>>     }
>>     ssc
>>   }
>>
>>
>>
>> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Sounds like something's not set up right... can you post a minimal code
>>> example that reproduces the issue?
>>>
>>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <su...@gmail.com>
>>> wrote:
>>>
>>>> Yeah. All messages are lost while the streaming job was down.
>>>>
>>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Are you actually losing messages then?
>>>>>
>>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> No; first batch only contains messages received after the second job
>>>>>> starts (messages come in at a steady rate of about 400/second).
>>>>>>
>>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Does the first batch after restart contain all the messages received
>>>>>>> while the job was down?
>>>>>>>
>>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I'm using direct spark streaming (from kafka) with checkpointing,
>>>>>>>> and
>>>>>>>> everything works well until a restart. When I shut down (^C) the
>>>>>>>> first
>>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>>>> series of 0
>>>>>>>> event batches that get queued (corresponding to the 1 minute when
>>>>>>>> the job
>>>>>>>> was down). Eventually, the batches would resume processing, and I
>>>>>>>> would see
>>>>>>>> that each batch has roughly 2000 events.
>>>>>>>>
>>>>>>>> I see that at the beginning of the second launch, the checkpoint
>>>>>>>> dirs are
>>>>>>>> found and "loaded", according to console output.
>>>>>>>>
>>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>>> something
>>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>>> streaming job
>>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>>> (without
>>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>>>
>>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>>> there would
>>>>>>>> be so many 0 event batches that the job would hang. Is this merely
>>>>>>>> something
>>>>>>>> to be "waited out", or should I set up some restart behavior/make a
>>>>>>>> config
>>>>>>>> change to discard checkpointing if the elapsed time has been too
>>>>>>>> long?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> <
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Cody Koeninger <co...@koeninger.org>.
The first thing that stands out to me is
createOnError = true

Are you sure the checkpoint is actually loading, as opposed to failing and
starting the job anyway?  There should be info lines that look like

INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)


You should be able to tell from those whether the offset ranges being
loaded from the checkpoint look reasonable.

Also, is there a reason you're calling

directKStream.checkpoint(checkpointDuration)

Just calling checkpoint on the streaming context should be sufficient to
save the metadata



On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <su...@gmail.com> wrote:

> Sure thing!
>
> The main looks like:
>
>
> --------------------------------------------------------------------------------------------------
>
>
> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>
> val kafkaConf = Map(
>       "zookeeper.connect" -> zookeeper,
>       "group.id" -> options.group,
>       "zookeeper.connection.timeout.ms" -> "10000",
>       "auto.commit.interval.ms" -> "1000",
>       "rebalance.max.retries" -> "25",
>       "bootstrap.servers" -> kafkaBrokers
>     )
>
> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>       () => {
>         createContext(kafkaConf, checkpointDirectory, topic, numThreads,
> isProd)
>       }, createOnError = true)
>
> ssc.start()
> ssc.awaitTermination()
>
>
>
> --------------------------------------------------------------------------------------------------
>
>
> And createContext is defined as:
>
>
>
> --------------------------------------------------------------------------------------------------
>
>
> val batchDuration = Seconds(5)
> val checkpointDuration = Seconds(20)
>
> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>
> def createContext(kafkaConf: Map[String, String],
>                     checkpointDirectory: String,
>                     topic: String,
>                     numThreads: Int,
>                     isProd: Boolean)
>   : StreamingContext = {
>
>     val sparkConf = new SparkConf().setAppName("***")
>     val ssc = new StreamingContext(sparkConf, batchDuration)
>     ssc.checkpoint(checkpointDirectory)
>
>     val topicSet = topic.split(",").toSet
>     val groupId = kafkaConf.getOrElse("group.id", "")
>
>     val directKStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>     directKStream.checkpoint(checkpointDuration)
>
>     val table = ***
>
>     directKStream.foreachRDD { rdd =>
>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>       rdd.flatMap(rec => someFunc(rec))
>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>         .foreachPartition { partitionRec =>
>           val dbWrite = DynamoDBWriter()
>           partitionRec.foreach {
>             /* Update Dynamo Here */
>           }
>         }
>
>       /** Set up ZK Connection **/
>       val props = new Properties()
>       kafkaConf.foreach(param => props.put(param._1, param._2))
>
>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>
>       val consumerConfig = new ConsumerConfig(props)
>       assert(!consumerConfig.autoCommitEnable)
>
>       val zkClient = new ZkClient(consumerConfig.zkConnect,
> consumerConfig.zkSessionTimeoutMs,
>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>
>       offsetRanges.foreach { osr =>
>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>         val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>         ZkUtils.updatePersistentPath(zkClient, zkPath,
> osr.untilOffset.toString)
>       }
>     }
>     ssc
>   }
>
>
>
> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Sounds like something's not set up right... can you post a minimal code
>> example that reproduces the issue?
>>
>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <su...@gmail.com>
>> wrote:
>>
>>> Yeah. All messages are lost while the streaming job was down.
>>>
>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Are you actually losing messages then?
>>>>
>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> No; first batch only contains messages received after the second job
>>>>> starts (messages come in at a steady rate of about 400/second).
>>>>>
>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Does the first batch after restart contain all the messages received
>>>>>> while the job was down?
>>>>>>
>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>>>>>> everything works well until a restart. When I shut down (^C) the
>>>>>>> first
>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>>> series of 0
>>>>>>> event batches that get queued (corresponding to the 1 minute when
>>>>>>> the job
>>>>>>> was down). Eventually, the batches would resume processing, and I
>>>>>>> would see
>>>>>>> that each batch has roughly 2000 events.
>>>>>>>
>>>>>>> I see that at the beginning of the second launch, the checkpoint
>>>>>>> dirs are
>>>>>>> found and "loaded", according to console output.
>>>>>>>
>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>> something
>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>> streaming job
>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>> (without
>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>>
>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>> there would
>>>>>>> be so many 0 event batches that the job would hang. Is this merely
>>>>>>> something
>>>>>>> to be "waited out", or should I set up some restart behavior/make a
>>>>>>> config
>>>>>>> change to discard checkpointing if the elapsed time has been too
>>>>>>> long?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> <
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Susan Zhang <su...@gmail.com>.
Sure thing!

The main looks like:

--------------------------------------------------------------------------------------------------


val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")

val kafkaConf = Map(
      "zookeeper.connect" -> zookeeper,
      "group.id" -> options.group,
      "zookeeper.connection.timeout.ms" -> "10000",
      "auto.commit.interval.ms" -> "1000",
      "rebalance.max.retries" -> "25",
      "bootstrap.servers" -> kafkaBrokers
    )

val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => {
        createContext(kafkaConf, checkpointDirectory, topic, numThreads,
isProd)
      }, createOnError = true)

ssc.start()
ssc.awaitTermination()


--------------------------------------------------------------------------------------------------


And createContext is defined as:


--------------------------------------------------------------------------------------------------


val batchDuration = Seconds(5)
val checkpointDuration = Seconds(20)

private val AUTO_OFFSET_COMMIT = "auto.commit.enable"

def createContext(kafkaConf: Map[String, String],
                    checkpointDirectory: String,
                    topic: String,
                    numThreads: Int,
                    isProd: Boolean)
  : StreamingContext = {

    val sparkConf = new SparkConf().setAppName("***")
    val ssc = new StreamingContext(sparkConf, batchDuration)
    ssc.checkpoint(checkpointDirectory)

    val topicSet = topic.split(",").toSet
    val groupId = kafkaConf.getOrElse("group.id", "")

    val directKStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
    directKStream.checkpoint(checkpointDuration)

    val table = ***

    directKStream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.flatMap(rec => someFunc(rec))
        .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
        .foreachPartition { partitionRec =>
          val dbWrite = DynamoDBWriter()
          partitionRec.foreach {
            /* Update Dynamo Here */
          }
        }

      /** Set up ZK Connection **/
      val props = new Properties()
      kafkaConf.foreach(param => props.put(param._1, param._2))

      props.setProperty(AUTO_OFFSET_COMMIT, "false")

      val consumerConfig = new ConsumerConfig(props)
      assert(!consumerConfig.autoCommitEnable)

      val zkClient = new ZkClient(consumerConfig.zkConnect,
consumerConfig.zkSessionTimeoutMs,
        consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

      offsetRanges.foreach { osr =>
        val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
        val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
        ZkUtils.updatePersistentPath(zkClient, zkPath,
osr.untilOffset.toString)
      }
    }
    ssc
  }



On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Sounds like something's not set up right... can you post a minimal code
> example that reproduces the issue?
>
> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <su...@gmail.com> wrote:
>
>> Yeah. All messages are lost while the streaming job was down.
>>
>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Are you actually losing messages then?
>>>
>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com>
>>> wrote:
>>>
>>>> No; first batch only contains messages received after the second job
>>>> starts (messages come in at a steady rate of about 400/second).
>>>>
>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Does the first batch after restart contain all the messages received
>>>>> while the job was down?
>>>>>
>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>>>>> everything works well until a restart. When I shut down (^C) the first
>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>> series of 0
>>>>>> event batches that get queued (corresponding to the 1 minute when the
>>>>>> job
>>>>>> was down). Eventually, the batches would resume processing, and I
>>>>>> would see
>>>>>> that each batch has roughly 2000 events.
>>>>>>
>>>>>> I see that at the beginning of the second launch, the checkpoint dirs
>>>>>> are
>>>>>> found and "loaded", according to console output.
>>>>>>
>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>> something
>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>> streaming job
>>>>>> would resume from checkpoint and continue processing from there
>>>>>> (without
>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>
>>>>>> Also, if I were to wait > 10 minutes or so before re-launching, there
>>>>>> would
>>>>>> be so many 0 event batches that the job would hang. Is this merely
>>>>>> something
>>>>>> to be "waited out", or should I set up some restart behavior/make a
>>>>>> config
>>>>>> change to discard checkpointing if the elapsed time has been too long?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> <
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>> >
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Cody Koeninger <co...@koeninger.org>.
Sounds like something's not set up right... can you post a minimal code
example that reproduces the issue?

On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <su...@gmail.com> wrote:

> Yeah. All messages are lost while the streaming job was down.
>
> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Are you actually losing messages then?
>>
>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com>
>> wrote:
>>
>>> No; first batch only contains messages received after the second job
>>> starts (messages come in at a steady rate of about 400/second).
>>>
>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Does the first batch after restart contain all the messages received
>>>> while the job was down?
>>>>
>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>>>> everything works well until a restart. When I shut down (^C) the first
>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>> series of 0
>>>>> event batches that get queued (corresponding to the 1 minute when the
>>>>> job
>>>>> was down). Eventually, the batches would resume processing, and I
>>>>> would see
>>>>> that each batch has roughly 2000 events.
>>>>>
>>>>> I see that at the beginning of the second launch, the checkpoint dirs
>>>>> are
>>>>> found and "loaded", according to console output.
>>>>>
>>>>> Is this expected behavior? It seems like I might've configured
>>>>> something
>>>>> incorrectly, since I would expect with checkpointing that the
>>>>> streaming job
>>>>> would resume from checkpoint and continue processing from there
>>>>> (without
>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>
>>>>> Also, if I were to wait > 10 minutes or so before re-launching, there
>>>>> would
>>>>> be so many 0 event batches that the job would hang. Is this merely
>>>>> something
>>>>> to be "waited out", or should I set up some restart behavior/make a
>>>>> config
>>>>> change to discard checkpointing if the elapsed time has been too long?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> <
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>> >
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Susan Zhang <su...@gmail.com>.
Yeah. All messages are lost while the streaming job was down.

On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <co...@koeninger.org> wrote:

> Are you actually losing messages then?
>
> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com> wrote:
>
>> No; first batch only contains messages received after the second job
>> starts (messages come in at a steady rate of about 400/second).
>>
>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Does the first batch after restart contain all the messages received
>>> while the job was down?
>>>
>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>>> everything works well until a restart. When I shut down (^C) the first
>>>> streaming job, wait 1 minute, then re-submit, there is somehow a series
>>>> of 0
>>>> event batches that get queued (corresponding to the 1 minute when the
>>>> job
>>>> was down). Eventually, the batches would resume processing, and I would
>>>> see
>>>> that each batch has roughly 2000 events.
>>>>
>>>> I see that at the beginning of the second launch, the checkpoint dirs
>>>> are
>>>> found and "loaded", according to console output.
>>>>
>>>> Is this expected behavior? It seems like I might've configured something
>>>> incorrectly, since I would expect with checkpointing that the streaming
>>>> job
>>>> would resume from checkpoint and continue processing from there (without
>>>> seeing 0 event batches corresponding to when the job was down).
>>>>
>>>> Also, if I were to wait > 10 minutes or so before re-launching, there
>>>> would
>>>> be so many 0 event batches that the job would hang. Is this merely
>>>> something
>>>> to be "waited out", or should I set up some restart behavior/make a
>>>> config
>>>> change to discard checkpointing if the elapsed time has been too long?
>>>>
>>>> Thanks!
>>>>
>>>> <
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Cody Koeninger <co...@koeninger.org>.
Are you actually losing messages then?

On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <su...@gmail.com> wrote:

> No; first batch only contains messages received after the second job
> starts (messages come in at a steady rate of about 400/second).
>
> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Does the first batch after restart contain all the messages received
>> while the job was down?
>>
>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>> everything works well until a restart. When I shut down (^C) the first
>>> streaming job, wait 1 minute, then re-submit, there is somehow a series
>>> of 0
>>> event batches that get queued (corresponding to the 1 minute when the job
>>> was down). Eventually, the batches would resume processing, and I would
>>> see
>>> that each batch has roughly 2000 events.
>>>
>>> I see that at the beginning of the second launch, the checkpoint dirs are
>>> found and "loaded", according to console output.
>>>
>>> Is this expected behavior? It seems like I might've configured something
>>> incorrectly, since I would expect with checkpointing that the streaming
>>> job
>>> would resume from checkpoint and continue processing from there (without
>>> seeing 0 event batches corresponding to when the job was down).
>>>
>>> Also, if I were to wait > 10 minutes or so before re-launching, there
>>> would
>>> be so many 0 event batches that the job would hang. Is this merely
>>> something
>>> to be "waited out", or should I set up some restart behavior/make a
>>> config
>>> change to discard checkpointing if the elapsed time has been too long?
>>>
>>> Thanks!
>>>
>>> <
>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>> >
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Susan Zhang <su...@gmail.com>.
No; first batch only contains messages received after the second job starts
(messages come in at a steady rate of about 400/second).

On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <co...@koeninger.org> wrote:

> Does the first batch after restart contain all the messages received while
> the job was down?
>
> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm using direct spark streaming (from kafka) with checkpointing, and
>> everything works well until a restart. When I shut down (^C) the first
>> streaming job, wait 1 minute, then re-submit, there is somehow a series
>> of 0
>> event batches that get queued (corresponding to the 1 minute when the job
>> was down). Eventually, the batches would resume processing, and I would
>> see
>> that each batch has roughly 2000 events.
>>
>> I see that at the beginning of the second launch, the checkpoint dirs are
>> found and "loaded", according to console output.
>>
>> Is this expected behavior? It seems like I might've configured something
>> incorrectly, since I would expect with checkpointing that the streaming
>> job
>> would resume from checkpoint and continue processing from there (without
>> seeing 0 event batches corresponding to when the job was down).
>>
>> Also, if I were to wait > 10 minutes or so before re-launching, there
>> would
>> be so many 0 event batches that the job would hang. Is this merely
>> something
>> to be "waited out", or should I set up some restart behavior/make a config
>> change to discard checkpointing if the elapsed time has been too long?
>>
>> Thanks!
>>
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>> >
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

Posted by Cody Koeninger <co...@koeninger.org>.
Does the first batch after restart contain all the messages received while
the job was down?

On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <su...@gmail.com> wrote:

> Hello,
>
> I'm using direct spark streaming (from kafka) with checkpointing, and
> everything works well until a restart. When I shut down (^C) the first
> streaming job, wait 1 minute, then re-submit, there is somehow a series of
> 0
> event batches that get queued (corresponding to the 1 minute when the job
> was down). Eventually, the batches would resume processing, and I would see
> that each batch has roughly 2000 events.
>
> I see that at the beginning of the second launch, the checkpoint dirs are
> found and "loaded", according to console output.
>
> Is this expected behavior? It seems like I might've configured something
> incorrectly, since I would expect with checkpointing that the streaming job
> would resume from checkpoint and continue processing from there (without
> seeing 0 event batches corresponding to when the job was down).
>
> Also, if I were to wait > 10 minutes or so before re-launching, there would
> be so many 0 event batches that the job would hang. Is this merely
> something
> to be "waited out", or should I set up some restart behavior/make a config
> change to discard checkpointing if the elapsed time has been too long?
>
> Thanks!
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>