You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nicolas Phung <ni...@gmail.com> on 2015/07/16 18:28:12 UTC

Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Hello,

When I'm reprocessing the data from kafka (about 40 Gb) with the new
Spark Streaming Kafka method createDirectStream, everything is fine
till a driver error happened (driver is killed, connection lost...).
When the driver pops up again, it resumes the processing with the
checkpoint in HDFS. Except, I got this:

15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4
times; aborting job
15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job
1437032118000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3
in stage 4.0 (TID 16, slave05.local):
java.lang.IllegalArgumentException
	at java.nio.Buffer.limit(Buffer.java:275)
	at kafka.message.Message.sliceDelimited(Message.scala:236)
	at kafka.message.Message.payload(Message.scala:218)
	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

This is happening only when I'm doing a full data processing from Kafka. If
there's no load, when you killed the driver and then restart, it resumes
the checkpoint as expected without missing data. Did someone encounters
something similar ? How did you solve this ?

Regards,

Nicolas PHUNG

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Cody Koeninger <co...@koeninger.org>.
That seems abnormally large for a checkpoint.  How many topicpartitions do
you have, what kind of batch size do you have, do you have any windowing
operations, and how long had the job been down for?

generatedRDDs holds a map of time => RDD, and that is used as the basis for
what to store in a checkpoint.  Normally that map is filtered to remove
RDDs based on the necessary rememberDuration, which is affected by how long
your window is.

During restart from a checkpoint, generatedRDDs is going to be loaded up
with one RDD for each time point, you should see info logs along the lines
of "Restoring KafkaRDD for time $t"... how many of those do you see?

If the number of time points * number of topicpartitions is really that
big, perhaps you need to give the driver more memory, but that seems
unusual to me.


On Wed, Jul 29, 2015 at 7:09 AM, Nicolas Phung <ni...@gmail.com>
wrote:

> Hello,
>
> I'm using 4Gb for the driver memory. The checkpoint is between 1 Gb and 10
> Gb depending if I'm reprocessing all the data from beginning or just
> getting the latest offset from the real time processed. Is there any best
> practice to be aware of with driver memory relating to checkpoint size ?
>
> Regards,
> Nicolas P.
>
> On Tue, Jul 28, 2015 at 4:42 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> That stacktrace looks like an out of heap space on the driver while
>> writing checkpoint, not on the worker nodes.  How much memory are you
>> giving the driver?  How big are your stored checkpoints?
>>
>> On Tue, Jul 28, 2015 at 9:30 AM, Nicolas Phung <ni...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> After using KafkaUtils.createDirectStream[Object, Object,
>>> KafkaAvroDecoder, KafkaAvroDecoder, Option[AnalyticEventEnriched]](ssc,
>>> kafkaParams, map, messageHandler), I'm encountering the following issue:
>>>
>>> 15/07/28 00:29:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>>> thread [sparkDriver-akka.actor.default-dispatcher-24] shutting down
>>> ActorSystem [sparkDriver]
>>> java.lang.OutOfMemoryError: Java heap space
>>>     at
>>> java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2351)
>>>     at
>>> java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2276)
>>>     at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1428)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>     at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>     at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>     at
>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>>     at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>     at
>>> scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:137)
>>>     at
>>> scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:135)
>>>     at
>>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>>     at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>>     at
>>> scala.collection.mutable.HashTable$class.serializeTo(HashTable.scala:124)
>>>     at scala.collection.mutable.HashMap.serializeTo(HashMap.scala:39)
>>>     at scala.collection.mutable.HashMap.writeObject(HashMap.scala:135)
>>>     at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:483)
>>>     at
>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>     at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>     at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>     at
>>> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>>>     at
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:128)
>>>     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
>>>     at
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123)
>>>     at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> I don't know why, after that, it's eating all the CPU on one of the node
>>> till the entire job stopped. It tries to resume from checkpoint several
>>> times but failed with this error too. I think I have enough spared memory
>>> with 4 nodes with 24 Gb per nodes. It has processed successfully around 40
>>> gb before that and looking into storage in Spark UI, I don't have a big rdd
>>> stored in memory/disk. I notice on this node, there's an increase in
>>> connection to kafka that are not closed too.
>>>
>>> Regards,
>>> Nicolas P.
>>>
>>> On Fri, Jul 24, 2015 at 3:32 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> It's really a question of whether you need access to the
>>>> MessageAndMetadata, or just the key / value from the message.
>>>>
>>>> If you just need the key/value, dstream map is fine.
>>>>
>>>> In your case, since you need to be able to control a possible failure
>>>> when deserializing the message from the MessageAndMetadata, I'd just go
>>>> ahead and do the work in the messageHandler.
>>>>
>>>> On Fri, Jul 24, 2015 at 2:46 AM, Nicolas Phung <nicolas.phung@gmail.com
>>>> > wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I manage to read all my data back with skipping offset that contains a
>>>>> corrupt message. I have one more question regarding messageHandler method
>>>>> vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm
>>>>> using a function to read the serialized message from kafka and convert it
>>>>> into my appropriate object with some enrichments and sometimes add filter
>>>>> after that. Where's the best spot to put this logic inside messageHandler
>>>>> method (convert each message within this handler) or dstream.foreachRDD.map
>>>>> (map rdd) or dstream.map.foreachRDD (map dstream) ?
>>>>>
>>>>> Thank you for your help Cody.
>>>>> Regards,
>>>>> Nicolas PHUNG
>>>>>
>>>>> On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Yeah, I'm referring to that api.
>>>>>>
>>>>>> If you want to filter messages in addition to catching that
>>>>>> exception, have your mesageHandler return an option, so the type R would
>>>>>> end up being Option[WhateverYourClassIs], then filter out None before doing
>>>>>> the rest of your processing.
>>>>>>
>>>>>> If you aren't already recording offsets somewhere, and need to find
>>>>>> the offsets at the beginning of the topic, you can take a look at this
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143
>>>>>>
>>>>>> as an example of querying offsets from Kafka.
>>>>>>
>>>>>> That code is private, but you can either use it as an example, or
>>>>>> remove the private[spark] and recompile just the spark-streaming-kafka
>>>>>> package.  That artifact is included in your job assembly, so you won't have
>>>>>> to redeploy spark if you go that route.
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <
>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody,
>>>>>>>
>>>>>>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite
>>>>>>> understand how to use the messageHandler parameter/function in the
>>>>>>> createDirectStream method. You are referring to this, aren't you ?
>>>>>>>
>>>>>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>>>>>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc:
>>>>>>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map
>>>>>>> [TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V]
>>>>>>> => R ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD,
>>>>>>> R]( ssc, kafkaParams, fromOffsets, messageHandler) }
>>>>>>>
>>>>>>> So, I must supply the fromOffsets parameter too, but how do I tell
>>>>>>> this method to read from the beginning of my topic ?
>>>>>>>
>>>>>>> If I have a filter (e.g. a R.date field) on my R class, I can put a
>>>>>>> filter in the messageHandler function too ?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Nicolas P.
>>>>>>>
>>>>>>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yeah, in the function you supply for the messageHandler parameter
>>>>>>>> to createDirectStream, catch the exception and do whatever makes sense for
>>>>>>>> your application.
>>>>>>>>
>>>>>>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <
>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> Using the old Spark Streaming Kafka API, I got the following
>>>>>>>>> around the same offset:
>>>>>>>>>
>>>>>>>>> kafka.message.InvalidMessageException: Message is corrupt (stored
>>>>>>>>> crc = 3561357254, computed crc = 171652633)
>>>>>>>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>>>>>>>         at
>>>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>>>>>>>         at
>>>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>>>>>>>         at
>>>>>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>>>>>>>         at
>>>>>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>>>>>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling
>>>>>>>>> message
>>>>>>>>> java.lang.IllegalStateException: Iterator is in failed state
>>>>>>>>>         at
>>>>>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>
>>>>>>>>> I found some old topic about some possible corrupt Kafka message
>>>>>>>>> produced by the new producer API with Snappy compression on. My question
>>>>>>>>> is, is it possible to skip/ignore those offsets when full processing with
>>>>>>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Nicolas PHUNG
>>>>>>>>>
>>>>>>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <
>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> I'd try logging the offsets for each message, see where problems
>>>>>>>>>> start, then try using the console consumer starting at those offsets and
>>>>>>>>>> see if you can reproduce the problem.
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <
>>>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Cody,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for you help. It seems there's something wrong with some
>>>>>>>>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>>>>>>>>> bigger or incomplete message since I use default configuration to accept
>>>>>>>>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>>>>>>>>> suggestions, please tell me.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Nicolas PHUNG
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <
>>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Not exactly the same issue, but possibly related:
>>>>>>>>>>>>
>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <
>>>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Well, working backwards down the stack trace...
>>>>>>>>>>>>>
>>>>>>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>>>>
>>>>>>>>>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>>>>
>>>>>>>>>>>>> If size had been negative, it would have just returned null,
>>>>>>>>>>>>> so we know the exception got thrown because the size was greater than the
>>>>>>>>>>>>> buffer's capacity
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I haven't seen that before... maybe a corrupted message of
>>>>>>>>>>>>> some kind?
>>>>>>>>>>>>>
>>>>>>>>>>>>> If that problem is reproducible, try providing an explicit
>>>>>>>>>>>>> argument for messageHandler, with a function that logs the message offset.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>>>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is happening only when I'm doing a full data processing
>>>>>>>>>>>>>> from Kafka. If there's no load, when you killed the driver and then
>>>>>>>>>>>>>> restart, it resumes the checkpoint as expected without missing data. Did
>>>>>>>>>>>>>> someone encounters something similar ? How did you solve this ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Nicolas PHUNG
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Nicolas Phung <ni...@gmail.com>.
Hello,

I'm using 4Gb for the driver memory. The checkpoint is between 1 Gb and 10
Gb depending if I'm reprocessing all the data from beginning or just
getting the latest offset from the real time processed. Is there any best
practice to be aware of with driver memory relating to checkpoint size ?

Regards,
Nicolas P.

On Tue, Jul 28, 2015 at 4:42 PM, Cody Koeninger <co...@koeninger.org> wrote:

> That stacktrace looks like an out of heap space on the driver while
> writing checkpoint, not on the worker nodes.  How much memory are you
> giving the driver?  How big are your stored checkpoints?
>
> On Tue, Jul 28, 2015 at 9:30 AM, Nicolas Phung <ni...@gmail.com>
> wrote:
>
>> Hi,
>>
>> After using KafkaUtils.createDirectStream[Object, Object,
>> KafkaAvroDecoder, KafkaAvroDecoder, Option[AnalyticEventEnriched]](ssc,
>> kafkaParams, map, messageHandler), I'm encountering the following issue:
>>
>> 15/07/28 00:29:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-24] shutting down
>> ActorSystem [sparkDriver]
>> java.lang.OutOfMemoryError: Java heap space
>>     at
>> java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2351)
>>     at
>> java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2276)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1428)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>     at
>> scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:137)
>>     at
>> scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:135)
>>     at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>     at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>     at
>> scala.collection.mutable.HashTable$class.serializeTo(HashTable.scala:124)
>>     at scala.collection.mutable.HashMap.serializeTo(HashMap.scala:39)
>>     at scala.collection.mutable.HashMap.writeObject(HashMap.scala:135)
>>     at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:483)
>>     at
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at
>> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>>     at
>> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:128)
>>     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
>>     at
>> org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123)
>>     at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> I don't know why, after that, it's eating all the CPU on one of the node
>> till the entire job stopped. It tries to resume from checkpoint several
>> times but failed with this error too. I think I have enough spared memory
>> with 4 nodes with 24 Gb per nodes. It has processed successfully around 40
>> gb before that and looking into storage in Spark UI, I don't have a big rdd
>> stored in memory/disk. I notice on this node, there's an increase in
>> connection to kafka that are not closed too.
>>
>> Regards,
>> Nicolas P.
>>
>> On Fri, Jul 24, 2015 at 3:32 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> It's really a question of whether you need access to the
>>> MessageAndMetadata, or just the key / value from the message.
>>>
>>> If you just need the key/value, dstream map is fine.
>>>
>>> In your case, since you need to be able to control a possible failure
>>> when deserializing the message from the MessageAndMetadata, I'd just go
>>> ahead and do the work in the messageHandler.
>>>
>>> On Fri, Jul 24, 2015 at 2:46 AM, Nicolas Phung <ni...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I manage to read all my data back with skipping offset that contains a
>>>> corrupt message. I have one more question regarding messageHandler method
>>>> vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm
>>>> using a function to read the serialized message from kafka and convert it
>>>> into my appropriate object with some enrichments and sometimes add filter
>>>> after that. Where's the best spot to put this logic inside messageHandler
>>>> method (convert each message within this handler) or dstream.foreachRDD.map
>>>> (map rdd) or dstream.map.foreachRDD (map dstream) ?
>>>>
>>>> Thank you for your help Cody.
>>>> Regards,
>>>> Nicolas PHUNG
>>>>
>>>> On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Yeah, I'm referring to that api.
>>>>>
>>>>> If you want to filter messages in addition to catching that exception,
>>>>> have your mesageHandler return an option, so the type R would end up being
>>>>> Option[WhateverYourClassIs], then filter out None before doing the rest of
>>>>> your processing.
>>>>>
>>>>> If you aren't already recording offsets somewhere, and need to find
>>>>> the offsets at the beginning of the topic, you can take a look at this
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143
>>>>>
>>>>> as an example of querying offsets from Kafka.
>>>>>
>>>>> That code is private, but you can either use it as an example, or
>>>>> remove the private[spark] and recompile just the spark-streaming-kafka
>>>>> package.  That artifact is included in your job assembly, so you won't have
>>>>> to redeploy spark if you go that route.
>>>>>
>>>>>
>>>>> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <
>>>>> nicolas.phung@gmail.com> wrote:
>>>>>
>>>>>> Hi Cody,
>>>>>>
>>>>>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite
>>>>>> understand how to use the messageHandler parameter/function in the
>>>>>> createDirectStream method. You are referring to this, aren't you ?
>>>>>>
>>>>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>>>>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc:
>>>>>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[
>>>>>> TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] =>
>>>>>> R ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R
>>>>>> ]( ssc, kafkaParams, fromOffsets, messageHandler) }
>>>>>>
>>>>>> So, I must supply the fromOffsets parameter too, but how do I tell
>>>>>> this method to read from the beginning of my topic ?
>>>>>>
>>>>>> If I have a filter (e.g. a R.date field) on my R class, I can put a
>>>>>> filter in the messageHandler function too ?
>>>>>>
>>>>>> Regards,
>>>>>> Nicolas P.
>>>>>>
>>>>>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Yeah, in the function you supply for the messageHandler parameter to
>>>>>>> createDirectStream, catch the exception and do whatever makes sense for
>>>>>>> your application.
>>>>>>>
>>>>>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <
>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> Using the old Spark Streaming Kafka API, I got the following around
>>>>>>>> the same offset:
>>>>>>>>
>>>>>>>> kafka.message.InvalidMessageException: Message is corrupt (stored
>>>>>>>> crc = 3561357254, computed crc = 171652633)
>>>>>>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>>>>>>         at
>>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>>>>>>         at
>>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>>>>>>         at
>>>>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>>>>>>         at
>>>>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>>>>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling
>>>>>>>> message
>>>>>>>> java.lang.IllegalStateException: Iterator is in failed state
>>>>>>>>         at
>>>>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>
>>>>>>>> I found some old topic about some possible corrupt Kafka message
>>>>>>>> produced by the new producer API with Snappy compression on. My question
>>>>>>>> is, is it possible to skip/ignore those offsets when full processing with
>>>>>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Nicolas PHUNG
>>>>>>>>
>>>>>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <cody@koeninger.org
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> I'd try logging the offsets for each message, see where problems
>>>>>>>>> start, then try using the console consumer starting at those offsets and
>>>>>>>>> see if you can reproduce the problem.
>>>>>>>>>
>>>>>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <
>>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Cody,
>>>>>>>>>>
>>>>>>>>>> Thanks for you help. It seems there's something wrong with some
>>>>>>>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>>>>>>>> bigger or incomplete message since I use default configuration to accept
>>>>>>>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>>>>>>>> suggestions, please tell me.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Nicolas PHUNG
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <
>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Not exactly the same issue, but possibly related:
>>>>>>>>>>>
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <
>>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Well, working backwards down the stack trace...
>>>>>>>>>>>>
>>>>>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>>>
>>>>>>>>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>>>
>>>>>>>>>>>> If size had been negative, it would have just returned null, so
>>>>>>>>>>>> we know the exception got thrown because the size was greater than the
>>>>>>>>>>>> buffer's capacity
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I haven't seen that before... maybe a corrupted message of some
>>>>>>>>>>>> kind?
>>>>>>>>>>>>
>>>>>>>>>>>> If that problem is reproducible, try providing an explicit
>>>>>>>>>>>> argument for messageHandler, with a function that logs the message offset.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>
>>>>>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is happening only when I'm doing a full data processing
>>>>>>>>>>>>> from Kafka. If there's no load, when you killed the driver and then
>>>>>>>>>>>>> restart, it resumes the checkpoint as expected without missing data. Did
>>>>>>>>>>>>> someone encounters something similar ? How did you solve this ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Nicolas PHUNG
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Cody Koeninger <co...@koeninger.org>.
That stacktrace looks like an out of heap space on the driver while writing
checkpoint, not on the worker nodes.  How much memory are you giving the
driver?  How big are your stored checkpoints?

On Tue, Jul 28, 2015 at 9:30 AM, Nicolas Phung <ni...@gmail.com>
wrote:

> Hi,
>
> After using KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder, Option[AnalyticEventEnriched]](ssc,
> kafkaParams, map, messageHandler), I'm encountering the following issue:
>
> 15/07/28 00:29:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-24] shutting down
> ActorSystem [sparkDriver]
> java.lang.OutOfMemoryError: Java heap space
>     at
> java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2351)
>     at
> java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2276)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1428)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at
> scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:137)
>     at
> scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:135)
>     at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>     at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>     at
> scala.collection.mutable.HashTable$class.serializeTo(HashTable.scala:124)
>     at scala.collection.mutable.HashMap.serializeTo(HashMap.scala:39)
>     at scala.collection.mutable.HashMap.writeObject(HashMap.scala:135)
>     at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:483)
>     at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>     at
> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:128)
>     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
>     at
> org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123)
>     at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> I don't know why, after that, it's eating all the CPU on one of the node
> till the entire job stopped. It tries to resume from checkpoint several
> times but failed with this error too. I think I have enough spared memory
> with 4 nodes with 24 Gb per nodes. It has processed successfully around 40
> gb before that and looking into storage in Spark UI, I don't have a big rdd
> stored in memory/disk. I notice on this node, there's an increase in
> connection to kafka that are not closed too.
>
> Regards,
> Nicolas P.
>
> On Fri, Jul 24, 2015 at 3:32 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> It's really a question of whether you need access to the
>> MessageAndMetadata, or just the key / value from the message.
>>
>> If you just need the key/value, dstream map is fine.
>>
>> In your case, since you need to be able to control a possible failure
>> when deserializing the message from the MessageAndMetadata, I'd just go
>> ahead and do the work in the messageHandler.
>>
>> On Fri, Jul 24, 2015 at 2:46 AM, Nicolas Phung <ni...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I manage to read all my data back with skipping offset that contains a
>>> corrupt message. I have one more question regarding messageHandler method
>>> vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm
>>> using a function to read the serialized message from kafka and convert it
>>> into my appropriate object with some enrichments and sometimes add filter
>>> after that. Where's the best spot to put this logic inside messageHandler
>>> method (convert each message within this handler) or dstream.foreachRDD.map
>>> (map rdd) or dstream.map.foreachRDD (map dstream) ?
>>>
>>> Thank you for your help Cody.
>>> Regards,
>>> Nicolas PHUNG
>>>
>>> On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Yeah, I'm referring to that api.
>>>>
>>>> If you want to filter messages in addition to catching that exception,
>>>> have your mesageHandler return an option, so the type R would end up being
>>>> Option[WhateverYourClassIs], then filter out None before doing the rest of
>>>> your processing.
>>>>
>>>> If you aren't already recording offsets somewhere, and need to find the
>>>> offsets at the beginning of the topic, you can take a look at this
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143
>>>>
>>>> as an example of querying offsets from Kafka.
>>>>
>>>> That code is private, but you can either use it as an example, or
>>>> remove the private[spark] and recompile just the spark-streaming-kafka
>>>> package.  That artifact is included in your job assembly, so you won't have
>>>> to redeploy spark if you go that route.
>>>>
>>>>
>>>> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <nicolas.phung@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi Cody,
>>>>>
>>>>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand
>>>>> how to use the messageHandler parameter/function in the createDirectStream
>>>>> method. You are referring to this, aren't you ?
>>>>>
>>>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>>>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc:
>>>>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[
>>>>> TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] =>
>>>>> R ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R](
>>>>> ssc, kafkaParams, fromOffsets, messageHandler) }
>>>>>
>>>>> So, I must supply the fromOffsets parameter too, but how do I tell
>>>>> this method to read from the beginning of my topic ?
>>>>>
>>>>> If I have a filter (e.g. a R.date field) on my R class, I can put a
>>>>> filter in the messageHandler function too ?
>>>>>
>>>>> Regards,
>>>>> Nicolas P.
>>>>>
>>>>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Yeah, in the function you supply for the messageHandler parameter to
>>>>>> createDirectStream, catch the exception and do whatever makes sense for
>>>>>> your application.
>>>>>>
>>>>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <
>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Using the old Spark Streaming Kafka API, I got the following around
>>>>>>> the same offset:
>>>>>>>
>>>>>>> kafka.message.InvalidMessageException: Message is corrupt (stored
>>>>>>> crc = 3561357254, computed crc = 171652633)
>>>>>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>>>>>         at
>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>>>>>         at
>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>>>>>         at
>>>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>>>>>         at
>>>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>>>         at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>>>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling
>>>>>>> message
>>>>>>> java.lang.IllegalStateException: Iterator is in failed state
>>>>>>>         at
>>>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>>>         at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> I found some old topic about some possible corrupt Kafka message
>>>>>>> produced by the new producer API with Snappy compression on. My question
>>>>>>> is, is it possible to skip/ignore those offsets when full processing with
>>>>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Nicolas PHUNG
>>>>>>>
>>>>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'd try logging the offsets for each message, see where problems
>>>>>>>> start, then try using the console consumer starting at those offsets and
>>>>>>>> see if you can reproduce the problem.
>>>>>>>>
>>>>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <
>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Cody,
>>>>>>>>>
>>>>>>>>> Thanks for you help. It seems there's something wrong with some
>>>>>>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>>>>>>> bigger or incomplete message since I use default configuration to accept
>>>>>>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>>>>>>> suggestions, please tell me.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Nicolas PHUNG
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <
>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> Not exactly the same issue, but possibly related:
>>>>>>>>>>
>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <
>>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Well, working backwards down the stack trace...
>>>>>>>>>>>
>>>>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>>
>>>>>>>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>>
>>>>>>>>>>> If size had been negative, it would have just returned null, so
>>>>>>>>>>> we know the exception got thrown because the size was greater than the
>>>>>>>>>>> buffer's capacity
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I haven't seen that before... maybe a corrupted message of some
>>>>>>>>>>> kind?
>>>>>>>>>>>
>>>>>>>>>>> If that problem is reproducible, try providing an explicit
>>>>>>>>>>> argument for messageHandler, with a function that logs the message offset.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>>>>>>
>>>>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>
>>>>>>>>>>>> This is happening only when I'm doing a full data processing
>>>>>>>>>>>> from Kafka. If there's no load, when you killed the driver and then
>>>>>>>>>>>> restart, it resumes the checkpoint as expected without missing data. Did
>>>>>>>>>>>> someone encounters something similar ? How did you solve this ?
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Nicolas PHUNG
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Nicolas Phung <ni...@gmail.com>.
Hi,

After using KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder,
KafkaAvroDecoder, Option[AnalyticEventEnriched]](ssc, kafkaParams, map,
messageHandler), I'm encountering the following issue:

15/07/28 00:29:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-24] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
    at
java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2351)
    at
java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2276)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1428)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at
scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:137)
    at
scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:135)
    at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at
scala.collection.mutable.HashTable$class.serializeTo(HashTable.scala:124)
    at scala.collection.mutable.HashMap.serializeTo(HashMap.scala:39)
    at scala.collection.mutable.HashMap.writeObject(HashMap.scala:135)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:128)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
    at
org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123)
    at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

I don't know why, after that, it's eating all the CPU on one of the node
till the entire job stopped. It tries to resume from checkpoint several
times but failed with this error too. I think I have enough spared memory
with 4 nodes with 24 Gb per nodes. It has processed successfully around 40
gb before that and looking into storage in Spark UI, I don't have a big rdd
stored in memory/disk. I notice on this node, there's an increase in
connection to kafka that are not closed too.

Regards,
Nicolas P.

On Fri, Jul 24, 2015 at 3:32 PM, Cody Koeninger <co...@koeninger.org> wrote:

> It's really a question of whether you need access to the
> MessageAndMetadata, or just the key / value from the message.
>
> If you just need the key/value, dstream map is fine.
>
> In your case, since you need to be able to control a possible failure when
> deserializing the message from the MessageAndMetadata, I'd just go ahead
> and do the work in the messageHandler.
>
> On Fri, Jul 24, 2015 at 2:46 AM, Nicolas Phung <ni...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I manage to read all my data back with skipping offset that contains a
>> corrupt message. I have one more question regarding messageHandler method
>> vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm
>> using a function to read the serialized message from kafka and convert it
>> into my appropriate object with some enrichments and sometimes add filter
>> after that. Where's the best spot to put this logic inside messageHandler
>> method (convert each message within this handler) or dstream.foreachRDD.map
>> (map rdd) or dstream.map.foreachRDD (map dstream) ?
>>
>> Thank you for your help Cody.
>> Regards,
>> Nicolas PHUNG
>>
>> On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Yeah, I'm referring to that api.
>>>
>>> If you want to filter messages in addition to catching that exception,
>>> have your mesageHandler return an option, so the type R would end up being
>>> Option[WhateverYourClassIs], then filter out None before doing the rest of
>>> your processing.
>>>
>>> If you aren't already recording offsets somewhere, and need to find the
>>> offsets at the beginning of the topic, you can take a look at this
>>>
>>>
>>> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143
>>>
>>> as an example of querying offsets from Kafka.
>>>
>>> That code is private, but you can either use it as an example, or remove
>>> the private[spark] and recompile just the spark-streaming-kafka package.
>>> That artifact is included in your job assembly, so you won't have to
>>> redeploy spark if you go that route.
>>>
>>>
>>> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <ni...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand
>>>> how to use the messageHandler parameter/function in the createDirectStream
>>>> method. You are referring to this, aren't you ?
>>>>
>>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc:
>>>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[
>>>> TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R
>>>> ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R](
>>>> ssc, kafkaParams, fromOffsets, messageHandler) }
>>>>
>>>> So, I must supply the fromOffsets parameter too, but how do I tell this
>>>> method to read from the beginning of my topic ?
>>>>
>>>> If I have a filter (e.g. a R.date field) on my R class, I can put a
>>>> filter in the messageHandler function too ?
>>>>
>>>> Regards,
>>>> Nicolas P.
>>>>
>>>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Yeah, in the function you supply for the messageHandler parameter to
>>>>> createDirectStream, catch the exception and do whatever makes sense for
>>>>> your application.
>>>>>
>>>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <
>>>>> nicolas.phung@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Using the old Spark Streaming Kafka API, I got the following around
>>>>>> the same offset:
>>>>>>
>>>>>> kafka.message.InvalidMessageException: Message is corrupt (stored crc
>>>>>> = 3561357254, computed crc = 171652633)
>>>>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>>>>         at
>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>>>>         at
>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>>>>         at
>>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>>>>         at
>>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>>>>         at
>>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>>         at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
>>>>>> java.lang.IllegalStateException: Iterator is in failed state
>>>>>>         at
>>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>>>>         at
>>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>>         at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> I found some old topic about some possible corrupt Kafka message
>>>>>> produced by the new producer API with Snappy compression on. My question
>>>>>> is, is it possible to skip/ignore those offsets when full processing with
>>>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>>>>
>>>>>> Regards,
>>>>>> Nicolas PHUNG
>>>>>>
>>>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I'd try logging the offsets for each message, see where problems
>>>>>>> start, then try using the console consumer starting at those offsets and
>>>>>>> see if you can reproduce the problem.
>>>>>>>
>>>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <
>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Cody,
>>>>>>>>
>>>>>>>> Thanks for you help. It seems there's something wrong with some
>>>>>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>>>>>> bigger or incomplete message since I use default configuration to accept
>>>>>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>>>>>> suggestions, please tell me.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Nicolas PHUNG
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <cody@koeninger.org
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Not exactly the same issue, but possibly related:
>>>>>>>>>
>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <
>>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> Well, working backwards down the stack trace...
>>>>>>>>>>
>>>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>
>>>>>>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>
>>>>>>>>>> If size had been negative, it would have just returned null, so
>>>>>>>>>> we know the exception got thrown because the size was greater than the
>>>>>>>>>> buffer's capacity
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I haven't seen that before... maybe a corrupted message of some
>>>>>>>>>> kind?
>>>>>>>>>>
>>>>>>>>>> If that problem is reproducible, try providing an explicit
>>>>>>>>>> argument for messageHandler, with a function that logs the message offset.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>>>>>
>>>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>
>>>>>>>>>>> This is happening only when I'm doing a full data processing
>>>>>>>>>>> from Kafka. If there's no load, when you killed the driver and then
>>>>>>>>>>> restart, it resumes the checkpoint as expected without missing data. Did
>>>>>>>>>>> someone encounters something similar ? How did you solve this ?
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>>
>>>>>>>>>>> Nicolas PHUNG
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Cody Koeninger <co...@koeninger.org>.
It's really a question of whether you need access to the
MessageAndMetadata, or just the key / value from the message.

If you just need the key/value, dstream map is fine.

In your case, since you need to be able to control a possible failure when
deserializing the message from the MessageAndMetadata, I'd just go ahead
and do the work in the messageHandler.

On Fri, Jul 24, 2015 at 2:46 AM, Nicolas Phung <ni...@gmail.com>
wrote:

> Hello,
>
> I manage to read all my data back with skipping offset that contains a
> corrupt message. I have one more question regarding messageHandler method
> vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm
> using a function to read the serialized message from kafka and convert it
> into my appropriate object with some enrichments and sometimes add filter
> after that. Where's the best spot to put this logic inside messageHandler
> method (convert each message within this handler) or dstream.foreachRDD.map
> (map rdd) or dstream.map.foreachRDD (map dstream) ?
>
> Thank you for your help Cody.
> Regards,
> Nicolas PHUNG
>
> On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Yeah, I'm referring to that api.
>>
>> If you want to filter messages in addition to catching that exception,
>> have your mesageHandler return an option, so the type R would end up being
>> Option[WhateverYourClassIs], then filter out None before doing the rest of
>> your processing.
>>
>> If you aren't already recording offsets somewhere, and need to find the
>> offsets at the beginning of the topic, you can take a look at this
>>
>>
>> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143
>>
>> as an example of querying offsets from Kafka.
>>
>> That code is private, but you can either use it as an example, or remove
>> the private[spark] and recompile just the spark-streaming-kafka package.
>> That artifact is included in your job assembly, so you won't have to
>> redeploy spark if you go that route.
>>
>>
>> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <ni...@gmail.com>
>> wrote:
>>
>>> Hi Cody,
>>>
>>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand
>>> how to use the messageHandler parameter/function in the createDirectStream
>>> method. You are referring to this, aren't you ?
>>>
>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc:
>>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[
>>> TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R
>>> ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R](
>>> ssc, kafkaParams, fromOffsets, messageHandler) }
>>>
>>> So, I must supply the fromOffsets parameter too, but how do I tell this
>>> method to read from the beginning of my topic ?
>>>
>>> If I have a filter (e.g. a R.date field) on my R class, I can put a
>>> filter in the messageHandler function too ?
>>>
>>> Regards,
>>> Nicolas P.
>>>
>>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Yeah, in the function you supply for the messageHandler parameter to
>>>> createDirectStream, catch the exception and do whatever makes sense for
>>>> your application.
>>>>
>>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <
>>>> nicolas.phung@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Using the old Spark Streaming Kafka API, I got the following around
>>>>> the same offset:
>>>>>
>>>>> kafka.message.InvalidMessageException: Message is corrupt (stored crc
>>>>> = 3561357254, computed crc = 171652633)
>>>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>>>         at
>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>>>         at
>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>>>         at
>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>>>         at
>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
>>>>> java.lang.IllegalStateException: Iterator is in failed state
>>>>>         at
>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> I found some old topic about some possible corrupt Kafka message
>>>>> produced by the new producer API with Snappy compression on. My question
>>>>> is, is it possible to skip/ignore those offsets when full processing with
>>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>>>
>>>>> Regards,
>>>>> Nicolas PHUNG
>>>>>
>>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> I'd try logging the offsets for each message, see where problems
>>>>>> start, then try using the console consumer starting at those offsets and
>>>>>> see if you can reproduce the problem.
>>>>>>
>>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <
>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody,
>>>>>>>
>>>>>>> Thanks for you help. It seems there's something wrong with some
>>>>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>>>>> bigger or incomplete message since I use default configuration to accept
>>>>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>>>>> suggestions, please tell me.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Nicolas PHUNG
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Not exactly the same issue, but possibly related:
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <
>>>>>>>> cody@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> Well, working backwards down the stack trace...
>>>>>>>>>
>>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>
>>>>>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>
>>>>>>>>> If size had been negative, it would have just returned null, so we
>>>>>>>>> know the exception got thrown because the size was greater than the
>>>>>>>>> buffer's capacity
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I haven't seen that before... maybe a corrupted message of some
>>>>>>>>> kind?
>>>>>>>>>
>>>>>>>>> If that problem is reproducible, try providing an explicit
>>>>>>>>> argument for messageHandler, with a function that logs the message offset.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>>>>
>>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>
>>>>>>>>>> This is happening only when I'm doing a full data processing from
>>>>>>>>>> Kafka. If there's no load, when you killed the driver and then restart, it
>>>>>>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>>>>>>> encounters something similar ? How did you solve this ?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Nicolas PHUNG
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Nicolas Phung <ni...@gmail.com>.
Hello,

I manage to read all my data back with skipping offset that contains a
corrupt message. I have one more question regarding messageHandler method
vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm
using a function to read the serialized message from kafka and convert it
into my appropriate object with some enrichments and sometimes add filter
after that. Where's the best spot to put this logic inside messageHandler
method (convert each message within this handler) or dstream.foreachRDD.map
(map rdd) or dstream.map.foreachRDD (map dstream) ?

Thank you for your help Cody.
Regards,
Nicolas PHUNG

On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Yeah, I'm referring to that api.
>
> If you want to filter messages in addition to catching that exception,
> have your mesageHandler return an option, so the type R would end up being
> Option[WhateverYourClassIs], then filter out None before doing the rest of
> your processing.
>
> If you aren't already recording offsets somewhere, and need to find the
> offsets at the beginning of the topic, you can take a look at this
>
>
> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143
>
> as an example of querying offsets from Kafka.
>
> That code is private, but you can either use it as an example, or remove
> the private[spark] and recompile just the spark-streaming-kafka package.
> That artifact is included in your job assembly, so you won't have to
> redeploy spark if you go that route.
>
>
> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <ni...@gmail.com>
> wrote:
>
>> Hi Cody,
>>
>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand
>> how to use the messageHandler parameter/function in the createDirectStream
>> method. You are referring to this, aren't you ?
>>
>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc:
>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[
>> TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R )
>> : InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc,
>> kafkaParams, fromOffsets, messageHandler) }
>>
>> So, I must supply the fromOffsets parameter too, but how do I tell this
>> method to read from the beginning of my topic ?
>>
>> If I have a filter (e.g. a R.date field) on my R class, I can put a
>> filter in the messageHandler function too ?
>>
>> Regards,
>> Nicolas P.
>>
>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Yeah, in the function you supply for the messageHandler parameter to
>>> createDirectStream, catch the exception and do whatever makes sense for
>>> your application.
>>>
>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <nicolas.phung@gmail.com
>>> > wrote:
>>>
>>>> Hello,
>>>>
>>>> Using the old Spark Streaming Kafka API, I got the following around the
>>>> same offset:
>>>>
>>>> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
>>>> 3561357254, computed crc = 171652633)
>>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>>         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>>         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>>         at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>>         at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>>         at
>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
>>>> java.lang.IllegalStateException: Iterator is in failed state
>>>>         at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>>         at
>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I found some old topic about some possible corrupt Kafka message
>>>> produced by the new producer API with Snappy compression on. My question
>>>> is, is it possible to skip/ignore those offsets when full processing with
>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>>
>>>> Regards,
>>>> Nicolas PHUNG
>>>>
>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> I'd try logging the offsets for each message, see where problems
>>>>> start, then try using the console consumer starting at those offsets and
>>>>> see if you can reproduce the problem.
>>>>>
>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <
>>>>> nicolas.phung@gmail.com> wrote:
>>>>>
>>>>>> Hi Cody,
>>>>>>
>>>>>> Thanks for you help. It seems there's something wrong with some
>>>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>>>> bigger or incomplete message since I use default configuration to accept
>>>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>>>> suggestions, please tell me.
>>>>>>
>>>>>> Regards,
>>>>>> Nicolas PHUNG
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Not exactly the same issue, but possibly related:
>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <cody@koeninger.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Well, working backwards down the stack trace...
>>>>>>>>
>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>
>>>>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>>>>
>>>>>>>>
>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>
>>>>>>>> If size had been negative, it would have just returned null, so we
>>>>>>>> know the exception got thrown because the size was greater than the
>>>>>>>> buffer's capacity
>>>>>>>>
>>>>>>>>
>>>>>>>> I haven't seen that before... maybe a corrupted message of some
>>>>>>>> kind?
>>>>>>>>
>>>>>>>> If that problem is reproducible, try providing an explicit argument
>>>>>>>> for messageHandler, with a function that logs the message offset.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>>>
>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>
>>>>>>>>> This is happening only when I'm doing a full data processing from
>>>>>>>>> Kafka. If there's no load, when you killed the driver and then restart, it
>>>>>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>>>>>> encounters something similar ? How did you solve this ?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Nicolas PHUNG
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Cody Koeninger <co...@koeninger.org>.
Yeah, I'm referring to that api.

If you want to filter messages in addition to catching that exception, have
your mesageHandler return an option, so the type R would end up being
Option[WhateverYourClassIs], then filter out None before doing the rest of
your processing.

If you aren't already recording offsets somewhere, and need to find the
offsets at the beginning of the topic, you can take a look at this

https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143

as an example of querying offsets from Kafka.

That code is private, but you can either use it as an example, or remove
the private[spark] and recompile just the spark-streaming-kafka package.
That artifact is included in your job assembly, so you won't have to
redeploy spark if you go that route.


On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <ni...@gmail.com>
wrote:

> Hi Cody,
>
> Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand how
> to use the messageHandler parameter/function in the createDirectStream
> method. You are referring to this, aren't you ?
>
> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext
> , kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition,
> Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] =
> { new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams,
> fromOffsets, messageHandler) }
>
> So, I must supply the fromOffsets parameter too, but how do I tell this
> method to read from the beginning of my topic ?
>
> If I have a filter (e.g. a R.date field) on my R class, I can put a filter
> in the messageHandler function too ?
>
> Regards,
> Nicolas P.
>
> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Yeah, in the function you supply for the messageHandler parameter to
>> createDirectStream, catch the exception and do whatever makes sense for
>> your application.
>>
>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <ni...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Using the old Spark Streaming Kafka API, I got the following around the
>>> same offset:
>>>
>>> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
>>> 3561357254, computed crc = 171652633)
>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>         at
>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>         at
>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>         at
>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>         at
>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>         at
>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
>>> java.lang.IllegalStateException: Iterator is in failed state
>>>         at
>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>         at
>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> I found some old topic about some possible corrupt Kafka message
>>> produced by the new producer API with Snappy compression on. My question
>>> is, is it possible to skip/ignore those offsets when full processing with
>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>
>>> Regards,
>>> Nicolas PHUNG
>>>
>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> I'd try logging the offsets for each message, see where problems start,
>>>> then try using the console consumer starting at those offsets and see if
>>>> you can reproduce the problem.
>>>>
>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <nicolas.phung@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi Cody,
>>>>>
>>>>> Thanks for you help. It seems there's something wrong with some
>>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>>> bigger or incomplete message since I use default configuration to accept
>>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>>> suggestions, please tell me.
>>>>>
>>>>> Regards,
>>>>> Nicolas PHUNG
>>>>>
>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Not exactly the same issue, but possibly related:
>>>>>>
>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Well, working backwards down the stack trace...
>>>>>>>
>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>
>>>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>>>
>>>>>>>
>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>
>>>>>>> If size had been negative, it would have just returned null, so we
>>>>>>> know the exception got thrown because the size was greater than the
>>>>>>> buffer's capacity
>>>>>>>
>>>>>>>
>>>>>>> I haven't seen that before... maybe a corrupted message of some kind?
>>>>>>>
>>>>>>> If that problem is reproducible, try providing an explicit argument
>>>>>>> for messageHandler, with a function that logs the message offset.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>>
>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>>
>>>>>>>> This is happening only when I'm doing a full data processing from
>>>>>>>> Kafka. If there's no load, when you killed the driver and then restart, it
>>>>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>>>>> encounters something similar ? How did you solve this ?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Nicolas PHUNG
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Nicolas Phung <ni...@gmail.com>.
Hi Cody,

Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand how
to use the messageHandler parameter/function in the createDirectStream
method. You are referring to this, aren't you ?

def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag
, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext,
kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { new
DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets,
messageHandler) }

So, I must supply the fromOffsets parameter too, but how do I tell this
method to read from the beginning of my topic ?

If I have a filter (e.g. a R.date field) on my R class, I can put a filter
in the messageHandler function too ?

Regards,
Nicolas P.

On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Yeah, in the function you supply for the messageHandler parameter to
> createDirectStream, catch the exception and do whatever makes sense for
> your application.
>
> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <ni...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Using the old Spark Streaming Kafka API, I got the following around the
>> same offset:
>>
>> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
>> 3561357254, computed crc = 171652633)
>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>         at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>         at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>         at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>         at
>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
>> java.lang.IllegalStateException: Iterator is in failed state
>>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>         at
>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> I found some old topic about some possible corrupt Kafka message produced
>> by the new producer API with Snappy compression on. My question is, is it
>> possible to skip/ignore those offsets when full processing with
>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>
>> Regards,
>> Nicolas PHUNG
>>
>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> I'd try logging the offsets for each message, see where problems start,
>>> then try using the console consumer starting at those offsets and see if
>>> you can reproduce the problem.
>>>
>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <ni...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> Thanks for you help. It seems there's something wrong with some
>>>> messages within my Kafka topics then. I don't understand how, I can get
>>>> bigger or incomplete message since I use default configuration to accept
>>>> only 1Mb message in my Kafka topic. If you have any others informations or
>>>> suggestions, please tell me.
>>>>
>>>> Regards,
>>>> Nicolas PHUNG
>>>>
>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Not exactly the same issue, but possibly related:
>>>>>
>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>
>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Well, working backwards down the stack trace...
>>>>>>
>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>
>>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>>
>>>>>>
>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>
>>>>>> If size had been negative, it would have just returned null, so we
>>>>>> know the exception got thrown because the size was greater than the
>>>>>> buffer's capacity
>>>>>>
>>>>>>
>>>>>> I haven't seen that before... maybe a corrupted message of some kind?
>>>>>>
>>>>>> If that problem is reproducible, try providing an explicit argument
>>>>>> for messageHandler, with a function that logs the message offset.
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>
>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> This is happening only when I'm doing a full data processing from
>>>>>>> Kafka. If there's no load, when you killed the driver and then restart, it
>>>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>>>> encounters something similar ? How did you solve this ?
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Nicolas PHUNG
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Cody Koeninger <co...@koeninger.org>.
Yeah, in the function you supply for the messageHandler parameter to
createDirectStream, catch the exception and do whatever makes sense for
your application.

On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <ni...@gmail.com>
wrote:

> Hello,
>
> Using the old Spark Streaming Kafka API, I got the following around the
> same offset:
>
> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
> 3561357254, computed crc = 171652633)
>         at kafka.message.Message.ensureValid(Message.scala:166)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
> java.lang.IllegalStateException: Iterator is in failed state
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>         at
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> I found some old topic about some possible corrupt Kafka message produced
> by the new producer API with Snappy compression on. My question is, is it
> possible to skip/ignore those offsets when full processing with KafkaUtils.createStream
> or KafkaUtils.createDirectStream ?
>
> Regards,
> Nicolas PHUNG
>
> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> I'd try logging the offsets for each message, see where problems start,
>> then try using the console consumer starting at those offsets and see if
>> you can reproduce the problem.
>>
>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <ni...@gmail.com>
>> wrote:
>>
>>> Hi Cody,
>>>
>>> Thanks for you help. It seems there's something wrong with some messages
>>> within my Kafka topics then. I don't understand how, I can get bigger or
>>> incomplete message since I use default configuration to accept only 1Mb
>>> message in my Kafka topic. If you have any others informations or
>>> suggestions, please tell me.
>>>
>>> Regards,
>>> Nicolas PHUNG
>>>
>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Not exactly the same issue, but possibly related:
>>>>
>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>
>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Well, working backwards down the stack trace...
>>>>>
>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>
>>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>>
>>>>>
>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>
>>>>> If size had been negative, it would have just returned null, so we
>>>>> know the exception got thrown because the size was greater than the
>>>>> buffer's capacity
>>>>>
>>>>>
>>>>> I haven't seen that before... maybe a corrupted message of some kind?
>>>>>
>>>>> If that problem is reproducible, try providing an explicit argument
>>>>> for messageHandler, with a function that logs the message offset.
>>>>>
>>>>>
>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>> nicolas.phung@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>
>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> This is happening only when I'm doing a full data processing from
>>>>>> Kafka. If there's no load, when you killed the driver and then restart, it
>>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>>> encounters something similar ? How did you solve this ?
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Nicolas PHUNG
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Nicolas Phung <ni...@gmail.com>.
Hello,

Using the old Spark Streaming Kafka API, I got the following around the
same offset:

kafka.message.InvalidMessageException: Message is corrupt (stored crc =
3561357254, computed crc = 171652633)
        at kafka.message.Message.ensureValid(Message.scala:166)
        at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
        at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
        at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        at
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
java.lang.IllegalStateException: Iterator is in failed state
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
        at
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I found some old topic about some possible corrupt Kafka message produced
by the new producer API with Snappy compression on. My question is, is it
possible to skip/ignore those offsets when full processing with
KafkaUtils.createStream
or KafkaUtils.createDirectStream ?

Regards,
Nicolas PHUNG

On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <co...@koeninger.org> wrote:

> I'd try logging the offsets for each message, see where problems start,
> then try using the console consumer starting at those offsets and see if
> you can reproduce the problem.
>
> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <ni...@gmail.com>
> wrote:
>
>> Hi Cody,
>>
>> Thanks for you help. It seems there's something wrong with some messages
>> within my Kafka topics then. I don't understand how, I can get bigger or
>> incomplete message since I use default configuration to accept only 1Mb
>> message in my Kafka topic. If you have any others informations or
>> suggestions, please tell me.
>>
>> Regards,
>> Nicolas PHUNG
>>
>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Not exactly the same issue, but possibly related:
>>>
>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>
>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Well, working backwards down the stack trace...
>>>>
>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>
>>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>>
>>>>
>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>
>>>> If size had been negative, it would have just returned null, so we know
>>>> the exception got thrown because the size was greater than the buffer's
>>>> capacity
>>>>
>>>>
>>>> I haven't seen that before... maybe a corrupted message of some kind?
>>>>
>>>> If that problem is reproducible, try providing an explicit argument for
>>>> messageHandler, with a function that logs the message offset.
>>>>
>>>>
>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>> nicolas.phung@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>>
>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> This is happening only when I'm doing a full data processing from
>>>>> Kafka. If there's no load, when you killed the driver and then restart, it
>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>> encounters something similar ? How did you solve this ?
>>>>>
>>>>> Regards,
>>>>>
>>>>> Nicolas PHUNG
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Cody Koeninger <co...@koeninger.org>.
I'd try logging the offsets for each message, see where problems start,
then try using the console consumer starting at those offsets and see if
you can reproduce the problem.

On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <ni...@gmail.com>
wrote:

> Hi Cody,
>
> Thanks for you help. It seems there's something wrong with some messages
> within my Kafka topics then. I don't understand how, I can get bigger or
> incomplete message since I use default configuration to accept only 1Mb
> message in my Kafka topic. If you have any others informations or
> suggestions, please tell me.
>
> Regards,
> Nicolas PHUNG
>
> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Not exactly the same issue, but possibly related:
>>
>> https://issues.apache.org/jira/browse/KAFKA-1196
>>
>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Well, working backwards down the stack trace...
>>>
>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>
>>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>>
>>>
>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>
>>> If size had been negative, it would have just returned null, so we know
>>> the exception got thrown because the size was greater than the buffer's
>>> capacity
>>>
>>>
>>> I haven't seen that before... maybe a corrupted message of some kind?
>>>
>>> If that problem is reproducible, try providing an explicit argument for
>>> messageHandler, with a function that logs the message offset.
>>>
>>>
>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <nicolas.phung@gmail.com
>>> > wrote:
>>>
>>>> Hello,
>>>>
>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>>
>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> This is happening only when I'm doing a full data processing from
>>>> Kafka. If there's no load, when you killed the driver and then restart, it
>>>> resumes the checkpoint as expected without missing data. Did someone
>>>> encounters something similar ? How did you solve this ?
>>>>
>>>> Regards,
>>>>
>>>> Nicolas PHUNG
>>>>
>>>
>>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Nicolas Phung <ni...@gmail.com>.
Hi Cody,

Thanks for you help. It seems there's something wrong with some messages
within my Kafka topics then. I don't understand how, I can get bigger or
incomplete message since I use default configuration to accept only 1Mb
message in my Kafka topic. If you have any others informations or
suggestions, please tell me.

Regards,
Nicolas PHUNG

On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Not exactly the same issue, but possibly related:
>
> https://issues.apache.org/jira/browse/KAFKA-1196
>
> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Well, working backwards down the stack trace...
>>
>> at java.nio.Buffer.limit(Buffer.java:275)
>>
>> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>>
>>
>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>
>> If size had been negative, it would have just returned null, so we know
>> the exception got thrown because the size was greater than the buffer's
>> capacity
>>
>>
>> I haven't seen that before... maybe a corrupted message of some kind?
>>
>> If that problem is reproducible, try providing an explicit argument for
>> messageHandler, with a function that logs the message offset.
>>
>>
>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <ni...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>>
>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>> 	at kafka.message.Message.payload(Message.scala:218)
>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>>
>>> This is happening only when I'm doing a full data processing from Kafka.
>>> If there's no load, when you killed the driver and then restart, it resumes
>>> the checkpoint as expected without missing data. Did someone encounters
>>> something similar ? How did you solve this ?
>>>
>>> Regards,
>>>
>>> Nicolas PHUNG
>>>
>>
>>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Cody Koeninger <co...@koeninger.org>.
Not exactly the same issue, but possibly related:

https://issues.apache.org/jira/browse/KAFKA-1196

On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Well, working backwards down the stack trace...
>
> at java.nio.Buffer.limit(Buffer.java:275)
>
> That exception gets thrown if the limit is negative or greater than the buffer's capacity
>
>
> at kafka.message.Message.sliceDelimited(Message.scala:236)
>
> If size had been negative, it would have just returned null, so we know
> the exception got thrown because the size was greater than the buffer's
> capacity
>
>
> I haven't seen that before... maybe a corrupted message of some kind?
>
> If that problem is reproducible, try providing an explicit argument for
> messageHandler, with a function that logs the message offset.
>
>
> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <ni...@gmail.com>
> wrote:
>
>> Hello,
>>
>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>>
>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
>> 	at java.nio.Buffer.limit(Buffer.java:275)
>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>> 	at kafka.message.Message.payload(Message.scala:218)
>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> 	at java.lang.Thread.run(Thread.java:745)
>>
>> This is happening only when I'm doing a full data processing from Kafka.
>> If there's no load, when you killed the driver and then restart, it resumes
>> the checkpoint as expected without missing data. Did someone encounters
>> something similar ? How did you solve this ?
>>
>> Regards,
>>
>> Nicolas PHUNG
>>
>
>

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

Posted by Cody Koeninger <co...@koeninger.org>.
Well, working backwards down the stack trace...

at java.nio.Buffer.limit(Buffer.java:275)

That exception gets thrown if the limit is negative or greater than
the buffer's capacity


at kafka.message.Message.sliceDelimited(Message.scala:236)

If size had been negative, it would have just returned null, so we know the
exception got thrown because the size was greater than the buffer's capacity


I haven't seen that before... maybe a corrupted message of some kind?

If that problem is reproducible, try providing an explicit argument for
messageHandler, with a function that logs the message offset.


On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <ni...@gmail.com>
wrote:

> Hello,
>
> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
>
> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job
> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
> 	at java.nio.Buffer.limit(Buffer.java:275)
> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
> 	at kafka.message.Message.payload(Message.scala:218)
> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
>
> This is happening only when I'm doing a full data processing from Kafka.
> If there's no load, when you killed the driver and then restart, it resumes
> the checkpoint as expected without missing data. Did someone encounters
> something similar ? How did you solve this ?
>
> Regards,
>
> Nicolas PHUNG
>