You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by pradeep s <sr...@gmail.com> on 2018/06/01 04:54:02 UTC

Kafka consumer loop exception handling

Hi,
I am running a poll loop for kafka consumer and the app is deployed in
kubernetes.I am using manual commits.Have couple of questions on exception
handling in the poll loop

1) Do i need to handle consumer rebalance scenario(when any of the consumer
pod dies) by adding a listener or will the commits be taken care after
rebalance .

2) Do i need to handle CommitFailedException specifically

Consume loop code below


@Override
public void run() {
    try {
        do {
            processRecords(kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()));
            kafkaConsumer.commitSync();
        } while (!isConsumerLoopClosed.get());
    } catch (WakeupException wakeupException) {
        //do nothing if wakeupException is from shutdown hook
        if (!isConsumerLoopClosed.get()) {
            handleConsumerLoopException(wakeupException);
        }
    } catch (RuntimeException ex) {
        handleConsumerLoopException(ex);
    } finally {
        kafkaConsumer.close();
    }


}

Thanks
Pradeep

Re: Kafka consumer loop exception handling

Posted by "M. Manna" <ma...@gmail.com>.
You should set the reset to latest, commit offsets manually using a
rebalance listener. In this way upon seek() you should get all data right.

Also when you say “Uncommitted” offset, that means you haven’t really
processed them. So you should determine such failure, manually control
offset commits, and protect from duplication.



Regards,


On Fri, 1 Jun 2018, 18:32 pradeep s, <sr...@gmail.com> wrote:

> Than you. In my case i am receiving messages , doing a small transformation
> and sending to a output topic .
> If i am running 4 consumers against 4 partitions and one of the consumer
> dies , will there be duplicate messages sent in this case
> Since when the new consumer comes up , it will again process from the
> uncommitted offset .
> So do i need transaction semantics in this scenario.
>
>
> On Fri, Jun 1, 2018 at 4:56 AM, M. Manna <ma...@gmail.com> wrote:
>
> > This is actually quite nicely explained by Jason Gustafson on this
> article
> > -
> > https://www.confluent.io/blog/tutorial-getting-started-with-
> > the-new-apache-kafka-0-9-consumer-client/
> >
> > It's technically up to the application on how to determine whether
> message
> > is fully received. If you have database txn involved, I would say that
> > CommitFailedException should revert all changes you have done. Because
> you
> > couldn't commit the offset successfully, you haven't "Really" consumed
> any
> > message.
> >
> > Tailoring your code a little bit:
> >
> > @Override
> > public void run() {
> >     try {
> >         do {
> >             processRecords(kafkaConsumer.poll(kafkaConfig.
> > getPollTimeoutMs()));
> >             kafkaConsumer.commitSync();
> >         } while (!isConsumerLoopClosed.get());
> >     } catch (WakeupException wakeupException) {
> >         //do nothing if wakeupException is from shutdown hook
> >         if (!isConsumerLoopClosed.get()) {
> >             handleConsumerLoopException(wakeupException);
> >         }
> >     } catch (RuntimeException ex) { // RuntimeException could also happen
> > for other reasons here
> >         if (ex instanceof CommitFailedException) {
> >             // revert db txn etc. to avoid false positives
> >         } else if (ex instanceof KafkaException) {
> >             // do something else.
> >         } else {
> >            // alternatively, do this
> >         }
> >         handleConsumerLoopException(ex);
> >     } finally {
> >         kafkaConsumer.close();
> >     }
> >
> > }
> >
> > One thing to remember is that when you are sending data, as of 1.0.0 API
> > you can have a "Txn-like" finer control to determine when you have
> > successfully committed a transaction. You can check beginTransaction(),
> > commitTransaction(), abortTransaction() methods to see how they can be
> > utilised to have even finer control over your message delivery.
> >
> > Regards,
> >
> >
> > On 1 June 2018 at 05:54, pradeep s <sr...@gmail.com> wrote:
> >
> > > Hi,
> > > I am running a poll loop for kafka consumer and the app is deployed in
> > > kubernetes.I am using manual commits.Have couple of questions on
> > exception
> > > handling in the poll loop
> > >
> > > 1) Do i need to handle consumer rebalance scenario(when any of the
> > consumer
> > > pod dies) by adding a listener or will the commits be taken care after
> > > rebalance .
> > >
> > > 2) Do i need to handle CommitFailedException specifically
> > >
> > > Consume loop code below
> > >
> > >
> > > @Override
> > > public void run() {
> > >     try {
> > >         do {
> > >             processRecords(kafkaConsumer.poll(kafkaConfig.
> > > getPollTimeoutMs()));
> > >             kafkaConsumer.commitSync();
> > >         } while (!isConsumerLoopClosed.get());
> > >     } catch (WakeupException wakeupException) {
> > >         //do nothing if wakeupException is from shutdown hook
> > >         if (!isConsumerLoopClosed.get()) {
> > >             handleConsumerLoopException(wakeupException);
> > >         }
> > >     } catch (RuntimeException ex) {
> > >         handleConsumerLoopException(ex);
> > >     } finally {
> > >         kafkaConsumer.close();
> > >     }
> > >
> > >
> > > }
> > >
> > > Thanks
> > > Pradeep
> > >
> >
>

Re: Kafka consumer loop exception handling

Posted by pradeep s <sr...@gmail.com>.
Than you. In my case i am receiving messages , doing a small transformation
and sending to a output topic .
If i am running 4 consumers against 4 partitions and one of the consumer
dies , will there be duplicate messages sent in this case
Since when the new consumer comes up , it will again process from the
uncommitted offset .
So do i need transaction semantics in this scenario.


On Fri, Jun 1, 2018 at 4:56 AM, M. Manna <ma...@gmail.com> wrote:

> This is actually quite nicely explained by Jason Gustafson on this article
> -
> https://www.confluent.io/blog/tutorial-getting-started-with-
> the-new-apache-kafka-0-9-consumer-client/
>
> It's technically up to the application on how to determine whether message
> is fully received. If you have database txn involved, I would say that
> CommitFailedException should revert all changes you have done. Because you
> couldn't commit the offset successfully, you haven't "Really" consumed any
> message.
>
> Tailoring your code a little bit:
>
> @Override
> public void run() {
>     try {
>         do {
>             processRecords(kafkaConsumer.poll(kafkaConfig.
> getPollTimeoutMs()));
>             kafkaConsumer.commitSync();
>         } while (!isConsumerLoopClosed.get());
>     } catch (WakeupException wakeupException) {
>         //do nothing if wakeupException is from shutdown hook
>         if (!isConsumerLoopClosed.get()) {
>             handleConsumerLoopException(wakeupException);
>         }
>     } catch (RuntimeException ex) { // RuntimeException could also happen
> for other reasons here
>         if (ex instanceof CommitFailedException) {
>             // revert db txn etc. to avoid false positives
>         } else if (ex instanceof KafkaException) {
>             // do something else.
>         } else {
>            // alternatively, do this
>         }
>         handleConsumerLoopException(ex);
>     } finally {
>         kafkaConsumer.close();
>     }
>
> }
>
> One thing to remember is that when you are sending data, as of 1.0.0 API
> you can have a "Txn-like" finer control to determine when you have
> successfully committed a transaction. You can check beginTransaction(),
> commitTransaction(), abortTransaction() methods to see how they can be
> utilised to have even finer control over your message delivery.
>
> Regards,
>
>
> On 1 June 2018 at 05:54, pradeep s <sr...@gmail.com> wrote:
>
> > Hi,
> > I am running a poll loop for kafka consumer and the app is deployed in
> > kubernetes.I am using manual commits.Have couple of questions on
> exception
> > handling in the poll loop
> >
> > 1) Do i need to handle consumer rebalance scenario(when any of the
> consumer
> > pod dies) by adding a listener or will the commits be taken care after
> > rebalance .
> >
> > 2) Do i need to handle CommitFailedException specifically
> >
> > Consume loop code below
> >
> >
> > @Override
> > public void run() {
> >     try {
> >         do {
> >             processRecords(kafkaConsumer.poll(kafkaConfig.
> > getPollTimeoutMs()));
> >             kafkaConsumer.commitSync();
> >         } while (!isConsumerLoopClosed.get());
> >     } catch (WakeupException wakeupException) {
> >         //do nothing if wakeupException is from shutdown hook
> >         if (!isConsumerLoopClosed.get()) {
> >             handleConsumerLoopException(wakeupException);
> >         }
> >     } catch (RuntimeException ex) {
> >         handleConsumerLoopException(ex);
> >     } finally {
> >         kafkaConsumer.close();
> >     }
> >
> >
> > }
> >
> > Thanks
> > Pradeep
> >
>

Re: Kafka consumer loop exception handling

Posted by "M. Manna" <ma...@gmail.com>.
This is actually quite nicely explained by Jason Gustafson on this article
-
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

It's technically up to the application on how to determine whether message
is fully received. If you have database txn involved, I would say that
CommitFailedException should revert all changes you have done. Because you
couldn't commit the offset successfully, you haven't "Really" consumed any
message.

Tailoring your code a little bit:

@Override
public void run() {
    try {
        do {
            processRecords(kafkaConsumer.poll(kafkaConfig.
getPollTimeoutMs()));
            kafkaConsumer.commitSync();
        } while (!isConsumerLoopClosed.get());
    } catch (WakeupException wakeupException) {
        //do nothing if wakeupException is from shutdown hook
        if (!isConsumerLoopClosed.get()) {
            handleConsumerLoopException(wakeupException);
        }
    } catch (RuntimeException ex) { // RuntimeException could also happen
for other reasons here
        if (ex instanceof CommitFailedException) {
            // revert db txn etc. to avoid false positives
        } else if (ex instanceof KafkaException) {
            // do something else.
        } else {
           // alternatively, do this
        }
        handleConsumerLoopException(ex);
    } finally {
        kafkaConsumer.close();
    }

}

One thing to remember is that when you are sending data, as of 1.0.0 API
you can have a "Txn-like" finer control to determine when you have
successfully committed a transaction. You can check beginTransaction(),
commitTransaction(), abortTransaction() methods to see how they can be
utilised to have even finer control over your message delivery.

Regards,


On 1 June 2018 at 05:54, pradeep s <sr...@gmail.com> wrote:

> Hi,
> I am running a poll loop for kafka consumer and the app is deployed in
> kubernetes.I am using manual commits.Have couple of questions on exception
> handling in the poll loop
>
> 1) Do i need to handle consumer rebalance scenario(when any of the consumer
> pod dies) by adding a listener or will the commits be taken care after
> rebalance .
>
> 2) Do i need to handle CommitFailedException specifically
>
> Consume loop code below
>
>
> @Override
> public void run() {
>     try {
>         do {
>             processRecords(kafkaConsumer.poll(kafkaConfig.
> getPollTimeoutMs()));
>             kafkaConsumer.commitSync();
>         } while (!isConsumerLoopClosed.get());
>     } catch (WakeupException wakeupException) {
>         //do nothing if wakeupException is from shutdown hook
>         if (!isConsumerLoopClosed.get()) {
>             handleConsumerLoopException(wakeupException);
>         }
>     } catch (RuntimeException ex) {
>         handleConsumerLoopException(ex);
>     } finally {
>         kafkaConsumer.close();
>     }
>
>
> }
>
> Thanks
> Pradeep
>