You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nicolas Phung <ni...@gmail.com> on 2015/07/21 09:38:36 UTC

Issue with corrupt message in Topic

Hello,

I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message
from my Kafka topic with Spark Streaming, I've got the following error :

kafka.message.InvalidMessageException: Message is corrupt (stored crc =
3561357254, computed crc = 171652633)
        at kafka.message.Message.ensureValid(Message.scala:166)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
scala:102)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
scala:33)
        at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla
te.scala:66)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
geHandler.run(ReliableKafkaReceiver.scala:265)
        at java.util.concurrent.Executors$RunnableAdapter.call(
Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
java.lang.IllegalStateException: Iterator is in failed state
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
        at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
geHandler.run(ReliableKafkaReceiver.scala:265)
        at java.util.concurrent.Executors$RunnableAdapter.call(
Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

>From my understanding, there's some corrupt message in my topic. I'm using
the new Producer API to send message compress with Snappy. I found an old
topic talking about it but with no further step to resolve the issue. Do
you have any informations regarding this ?

Is it possible in Kafka to somehow reread the topic and drop corrupt
message ?

Regards,
Nicolas PHUNG

Re: Issue with corrupt message in Topic

Posted by Adam Dubiel <du...@gmail.com>.
Hi Nicolas,

>From my experience there are only two ways out:
1) wait for retention time to pass, so data gets deleted (this is usually
unacceptable)
2) trace offset of corrupt message on all affected subscriptions and skip
this message by overwriting it (offset+1)

Problem is, that when encountering corrupt message, high level consumer
iterator goes into invalid state and closes. There is no way to skip this
message or recover from it without skipping offsets. You might try to use
SimpleConsumer though. Maybe someone knows other ways to deal with this
problem, but we haven't found any.

BR,
Adam

2015-07-21 9:38 GMT+02:00 Nicolas Phung <ni...@gmail.com>:

> Hello,
>
> I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message
> from my Kafka topic with Spark Streaming, I've got the following error :
>
> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
> 3561357254, computed crc = 171652633)
>         at kafka.message.Message.ensureValid(Message.scala:166)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
> scala:102)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
> scala:33)
>         at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla
> te.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
> geHandler.run(ReliableKafkaReceiver.scala:265)
>         at java.util.concurrent.Executors$RunnableAdapter.call(
> Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
> java.lang.IllegalStateException: Iterator is in failed state
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
> geHandler.run(ReliableKafkaReceiver.scala:265)
>         at java.util.concurrent.Executors$RunnableAdapter.call(
> Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> From my understanding, there's some corrupt message in my topic. I'm using
> the new Producer API to send message compress with Snappy. I found an old
> topic talking about it but with no further step to resolve the issue. Do
> you have any informations regarding this ?
>
> Is it possible in Kafka to somehow reread the topic and drop corrupt
> message ?
>
> Regards,
> Nicolas PHUNG
>