You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Manish Khettry <ma...@ooyala.com> on 2012/03/27 01:43:25 UTC

Dealing with errors when using Kafka Consumer

We have a fairly simple class that runs in a loop and consumes
messages from Kafka and feeds it to our stream processing system.

{
   .....
    consumerConnector = Consumer.create(new ConsumerConfig(props))
    val topicMessageStreams =
consumerConnector.createMessageStreams(Map(topic -> 1))

    // We only care about the first streamList from topicMessageStreams ...
    kafkaStream = topicMessageStreams(topic)(0)
    while (true) {
       val logMessage: String =
Utils.toString(kafkaStream.head.payload, "UTF-8")
       // do stuff with the message.
     }
  }

When this code gets an exception, it swallows it on the assumption
that the error is transient, and continues on its merry way. Obviously
this isn't the right thing to do in all cases (or even any case
perhaps)-- over a weekend, this code kept getting the same exception
and eventually logged many hundred gigs of error messages before it
got restarted. The exception we were getting from Kafka was:

java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:46)
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:35)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:38)"

I was wondering what exceptions are transient, which ones need special
handing (say reconnecting to kafka? or just exiting the JVM and have
our job monitors restart the process again). For example, with the
iterator in an invalid state, would creating a new connector have
helped? Any help would be appreciated.

Thanks,
Manish

Re: Dealing with errors when using Kafka Consumer

Posted by Jun Rao <ju...@gmail.com>.
Manish,

If you get IllegalStateException, this is a serious problem and it's
permanent. This typically means (1) the iterator is not used properly
(e.g., calling next before checking hasNext is true) or (2) a bug in Kafka.
We recently fixed the following issue in trunk. Not sure if this is exactly
your problem. Your usage of calling stream.head is a bit unusual. Normally,
people do "for (msg <- stream) { }".

https://issues.apache.org/jira/browse/KAFKA-241

Thanks,

Jun

On Mon, Mar 26, 2012 at 4:43 PM, Manish Khettry <ma...@ooyala.com> wrote:

> We have a fairly simple class that runs in a loop and consumes
> messages from Kafka and feeds it to our stream processing system.
>
> {
>   .....
>    consumerConnector = Consumer.create(new ConsumerConfig(props))
>    val topicMessageStreams =
> consumerConnector.createMessageStreams(Map(topic -> 1))
>
>    // We only care about the first streamList from topicMessageStreams ...
>    kafkaStream = topicMessageStreams(topic)(0)
>    while (true) {
>       val logMessage: String =
> Utils.toString(kafkaStream.head.payload, "UTF-8")
>       // do stuff with the message.
>     }
>  }
>
> When this code gets an exception, it swallows it on the assumption
> that the error is transient, and continues on its merry way. Obviously
> this isn't the right thing to do in all cases (or even any case
> perhaps)-- over a weekend, this code kept getting the same exception
> and eventually logged many hundred gigs of error messages before it
> got restarted. The exception we were getting from Kafka was:
>
> java.lang.IllegalStateException: Iterator is in failed state
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:46)
> at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:35)
> at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:38)"
>
> I was wondering what exceptions are transient, which ones need special
> handing (say reconnecting to kafka? or just exiting the JVM and have
> our job monitors restart the process again). For example, with the
> iterator in an invalid state, would creating a new connector have
> helped? Any help would be appreciated.
>
> Thanks,
> Manish
>