You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chen Wang <ch...@gmail.com> on 2014/08/12 23:27:34 UTC

Correct way to handle ConsumerTimeoutException

Folks,
I am using consumer.timeout.ms to force a consumer jump out hasNext call,
which will throw ConsumerTimeoutException. It seems that upon receiving
this exception, the consumer is no longer usable and I need to call
.shutdown, and recreate:

try{
} catch (ConsumerTimeoutException ex) {

 logger.info("consumer timeout, we consider the topic is drained");

 this.consumer.shutdown();

this.consumer = kafka.consumer.Consumer

  .createJavaConsumerConnector(new ConsumerConfig(

   this.consumerProperties));

 }


Is this the expected behavior? I call

this.consumer = kafka.consumer.Consumer

  .createJavaConsumerConnector(new ConsumerConfig(

   this.consumerProperties));

in the thread initialization phase, and hope to reuse it upon
ConsumerTimeoutException

Thanks,

Chen

Re: Correct way to handle ConsumerTimeoutException

Posted by Chen Wang <ch...@gmail.com>.
Got it.
Thanks guys!
Chen


On Wed, Aug 13, 2014 at 9:35 AM, Neha Narkhede <ne...@gmail.com>
wrote:

> I am using consumer.timeout.ms to force a consumer jump out hasNext call,
> which will throw ConsumerTimeoutException.
>
> Yes, this is the downside of the blocking iterator approach. If you want to
> pull data in batches and process messages, the iterator is not the best API
> as it can block at any time longer than your app is comfortable with. To
> fix this and a bunch of other problems with the consumer API, we are
> working on a new consumer client, that will replace the existing one and
> will support completely new APIs.
>
> Take a look at the new APIs here
> <
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >.
> Let us know if you have feedback.
>
>
> On Tue, Aug 12, 2014 at 4:37 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Chen,
> >
> > The rational of using the consumer timeout exception is to indicate when
> > there is no more data to be consumed, and hence upon capturing the
> > exception the consumer should be closed.
> >
> > If you want to restart the consumer in handling the timeout exception,
> then
> > you should probably just increasing the timeout value in the configs to
> > avoid it throwing timeout exception.
> >
> > Guozhang
> >
> >
> > On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang <ch...@gmail.com>
> > wrote:
> >
> > > Folks,
> > > I am using consumer.timeout.ms to force a consumer jump out hasNext
> > call,
> > > which will throw ConsumerTimeoutException. It seems that upon receiving
> > > this exception, the consumer is no longer usable and I need to call
> > > .shutdown, and recreate:
> > >
> > > try{
> > > } catch (ConsumerTimeoutException ex) {
> > >
> > >  logger.info("consumer timeout, we consider the topic is drained");
> > >
> > >  this.consumer.shutdown();
> > >
> > > this.consumer = kafka.consumer.Consumer
> > >
> > >   .createJavaConsumerConnector(new ConsumerConfig(
> > >
> > >    this.consumerProperties));
> > >
> > >  }
> > >
> > >
> > > Is this the expected behavior? I call
> > >
> > > this.consumer = kafka.consumer.Consumer
> > >
> > >   .createJavaConsumerConnector(new ConsumerConfig(
> > >
> > >    this.consumerProperties));
> > >
> > > in the thread initialization phase, and hope to reuse it upon
> > > ConsumerTimeoutException
> > >
> > > Thanks,
> > >
> > > Chen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Correct way to handle ConsumerTimeoutException

Posted by Neha Narkhede <ne...@gmail.com>.
I am using consumer.timeout.ms to force a consumer jump out hasNext call,
which will throw ConsumerTimeoutException.

Yes, this is the downside of the blocking iterator approach. If you want to
pull data in batches and process messages, the iterator is not the best API
as it can block at any time longer than your app is comfortable with. To
fix this and a bunch of other problems with the consumer API, we are
working on a new consumer client, that will replace the existing one and
will support completely new APIs.

Take a look at the new APIs here
<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html>.
Let us know if you have feedback.


On Tue, Aug 12, 2014 at 4:37 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Chen,
>
> The rational of using the consumer timeout exception is to indicate when
> there is no more data to be consumed, and hence upon capturing the
> exception the consumer should be closed.
>
> If you want to restart the consumer in handling the timeout exception, then
> you should probably just increasing the timeout value in the configs to
> avoid it throwing timeout exception.
>
> Guozhang
>
>
> On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang <ch...@gmail.com>
> wrote:
>
> > Folks,
> > I am using consumer.timeout.ms to force a consumer jump out hasNext
> call,
> > which will throw ConsumerTimeoutException. It seems that upon receiving
> > this exception, the consumer is no longer usable and I need to call
> > .shutdown, and recreate:
> >
> > try{
> > } catch (ConsumerTimeoutException ex) {
> >
> >  logger.info("consumer timeout, we consider the topic is drained");
> >
> >  this.consumer.shutdown();
> >
> > this.consumer = kafka.consumer.Consumer
> >
> >   .createJavaConsumerConnector(new ConsumerConfig(
> >
> >    this.consumerProperties));
> >
> >  }
> >
> >
> > Is this the expected behavior? I call
> >
> > this.consumer = kafka.consumer.Consumer
> >
> >   .createJavaConsumerConnector(new ConsumerConfig(
> >
> >    this.consumerProperties));
> >
> > in the thread initialization phase, and hope to reuse it upon
> > ConsumerTimeoutException
> >
> > Thanks,
> >
> > Chen
> >
>
>
>
> --
> -- Guozhang
>

Re: Correct way to handle ConsumerTimeoutException

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Chen,

The rational of using the consumer timeout exception is to indicate when
there is no more data to be consumed, and hence upon capturing the
exception the consumer should be closed.

If you want to restart the consumer in handling the timeout exception, then
you should probably just increasing the timeout value in the configs to
avoid it throwing timeout exception.

Guozhang


On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang <ch...@gmail.com>
wrote:

> Folks,
> I am using consumer.timeout.ms to force a consumer jump out hasNext call,
> which will throw ConsumerTimeoutException. It seems that upon receiving
> this exception, the consumer is no longer usable and I need to call
> .shutdown, and recreate:
>
> try{
> } catch (ConsumerTimeoutException ex) {
>
>  logger.info("consumer timeout, we consider the topic is drained");
>
>  this.consumer.shutdown();
>
> this.consumer = kafka.consumer.Consumer
>
>   .createJavaConsumerConnector(new ConsumerConfig(
>
>    this.consumerProperties));
>
>  }
>
>
> Is this the expected behavior? I call
>
> this.consumer = kafka.consumer.Consumer
>
>   .createJavaConsumerConnector(new ConsumerConfig(
>
>    this.consumerProperties));
>
> in the thread initialization phase, and hope to reuse it upon
> ConsumerTimeoutException
>
> Thanks,
>
> Chen
>



-- 
-- Guozhang