You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "Sangchoei, Yuttana" <Yu...@anz.com.INVALID> on 2022/11/09 10:26:06 UTC

Offset in OffsetRepositoy doesn't get increase after manual commit

Hi All,

I'm upgrading Camel from 3.16 to 3.17 and found a problem that OffsetRepository didn't get update after manual commit.
It is a Spring boot project and MemoryStateRepository as follows.

        @Bean
        public MemoryStateRepository offsetRepo() {
                MemoryStateRepository stateRepository = new MemoryStateRepository();
                stateRepository.setState(topic + "/0", "");
                return stateRepository;
        }

Here is the method I use when commit:

        private void commitOffsetToKafka(Exchange exchange) {
                KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
                manual.commit();
        }

Here is my Kafka endpoint

kafka:integration_test _topic?brokers=127.0.0.1:41478&autoCommitEnable=false&allowManualCommit=true&seekTo=beginning&maxPollRecords=2&groupId=ANE&autoOffsetReset=earliest&offsetRepository=#offsetRepo&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory

It is working fine in 3.16. When DefaultkafkaManualSyncCommit.commit() is invoked. The offsetRepository.setState() get invoked so the offset increased.
However, in 3.17, DefaultkafkaManualSyncCommit.commit() doesn't have a reference to offsetRepository so the offset in offsetRepository won't get increased.

Is this a bug or there is a different way to get the offset after committing. Could you please suggest what I should do to test if the offset get increased after manual commit?

Best regards,
Yuttana Sangchoei

This e-mail and any attachments to it (the "Communication") is, unless otherwise stated, confidential, may contain copyright material and is for the use only of the intended recipient. If you receive the Communication in error, please notify the sender immediately by return e-mail, delete the Communication and the return e-mail, and do not read, copy, retransmit or otherwise deal with it. Any views expressed in the Communication are those of the individual sender only, unless expressly stated to be those of Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any of its related entities including ANZ Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in connection with the integrity of or errors in the Communication, computer virus, data corruption, interference or delay arising from or in respect of the Communication.

Re: Offset in OffsetRepositoy doesn't get increase after manual commit

Posted by Otavio Rodolfo Piske <an...@gmail.com>.
Hi, before I go ahead and investigate this further ... Could you possibly
try with Camel 3.18.3? We have made a bunch of fixes for Kafka in 3.18.3.

Kind regards

On Wed, Nov 9, 2022 at 3:56 PM Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> I am not sure, there are some improvements and refactorings going on in
> camel-kafka and offset management.
> I think Otavio would be better to help look into this.
>
> On Wed, Nov 9, 2022 at 11:36 AM Sangchoei, Yuttana
> <Yu...@anz.com.invalid> wrote:
>
> > Hi All,
> >
> > I'm upgrading Camel from 3.16 to 3.17 and found a problem that
> > OffsetRepository didn't get update after manual commit.
> > It is a Spring boot project and MemoryStateRepository as follows.
> >
> >         @Bean
> >         public MemoryStateRepository offsetRepo() {
> >                 MemoryStateRepository stateRepository = new
> > MemoryStateRepository();
> >                 stateRepository.setState(topic + "/0", "");
> >                 return stateRepository;
> >         }
> >
> > Here is the method I use when commit:
> >
> >         private void commitOffsetToKafka(Exchange exchange) {
> >                 KafkaManualCommit manual =
> > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
> > KafkaManualCommit.class);
> >                 manual.commit();
> >         }
> >
> > Here is my Kafka endpoint
> >
> > kafka:integration_test _topic?brokers=127.0.0.1:41478
> >
> &autoCommitEnable=false&allowManualCommit=true&seekTo=beginning&maxPollRecords=2&groupId=ANE&autoOffsetReset=earliest&offsetRepository=#offsetRepo&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory
> >
> > It is working fine in 3.16. When DefaultkafkaManualSyncCommit.commit() is
> > invoked. The offsetRepository.setState() get invoked so the offset
> > increased.
> > However, in 3.17, DefaultkafkaManualSyncCommit.commit() doesn't have a
> > reference to offsetRepository so the offset in offsetRepository won't get
> > increased.
> >
> > Is this a bug or there is a different way to get the offset after
> > committing. Could you please suggest what I should do to test if the
> offset
> > get increased after manual commit?
> >
> > Best regards,
> > Yuttana Sangchoei
> >
> > This e-mail and any attachments to it (the "Communication") is, unless
> > otherwise stated, confidential, may contain copyright material and is for
> > the use only of the intended recipient. If you receive the Communication
> in
> > error, please notify the sender immediately by return e-mail, delete the
> > Communication and the return e-mail, and do not read, copy, retransmit or
> > otherwise deal with it. Any views expressed in the Communication are
> those
> > of the individual sender only, unless expressly stated to be those of
> > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or
> any
> > of its related entities including ANZ Bank New Zealand Limited (together
> > "ANZ"). ANZ does not accept liability in connection with the integrity of
> > or errors in the Communication, computer virus, data corruption,
> > interference or delay arising from or in respect of the Communication.
> >
>
>
> --
> Claus Ibsen
> -----------------
> @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>


-- 
Otavio R. Piske
http://orpiske.net

Re: Offset in OffsetRepositoy doesn't get increase after manual commit

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I am not sure, there are some improvements and refactorings going on in
camel-kafka and offset management.
I think Otavio would be better to help look into this.

On Wed, Nov 9, 2022 at 11:36 AM Sangchoei, Yuttana
<Yu...@anz.com.invalid> wrote:

> Hi All,
>
> I'm upgrading Camel from 3.16 to 3.17 and found a problem that
> OffsetRepository didn't get update after manual commit.
> It is a Spring boot project and MemoryStateRepository as follows.
>
>         @Bean
>         public MemoryStateRepository offsetRepo() {
>                 MemoryStateRepository stateRepository = new
> MemoryStateRepository();
>                 stateRepository.setState(topic + "/0", "");
>                 return stateRepository;
>         }
>
> Here is the method I use when commit:
>
>         private void commitOffsetToKafka(Exchange exchange) {
>                 KafkaManualCommit manual =
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
> KafkaManualCommit.class);
>                 manual.commit();
>         }
>
> Here is my Kafka endpoint
>
> kafka:integration_test _topic?brokers=127.0.0.1:41478
> &autoCommitEnable=false&allowManualCommit=true&seekTo=beginning&maxPollRecords=2&groupId=ANE&autoOffsetReset=earliest&offsetRepository=#offsetRepo&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory
>
> It is working fine in 3.16. When DefaultkafkaManualSyncCommit.commit() is
> invoked. The offsetRepository.setState() get invoked so the offset
> increased.
> However, in 3.17, DefaultkafkaManualSyncCommit.commit() doesn't have a
> reference to offsetRepository so the offset in offsetRepository won't get
> increased.
>
> Is this a bug or there is a different way to get the offset after
> committing. Could you please suggest what I should do to test if the offset
> get increased after manual commit?
>
> Best regards,
> Yuttana Sangchoei
>
> This e-mail and any attachments to it (the "Communication") is, unless
> otherwise stated, confidential, may contain copyright material and is for
> the use only of the intended recipient. If you receive the Communication in
> error, please notify the sender immediately by return e-mail, delete the
> Communication and the return e-mail, and do not read, copy, retransmit or
> otherwise deal with it. Any views expressed in the Communication are those
> of the individual sender only, unless expressly stated to be those of
> Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any
> of its related entities including ANZ Bank New Zealand Limited (together
> "ANZ"). ANZ does not accept liability in connection with the integrity of
> or errors in the Communication, computer virus, data corruption,
> interference or delay arising from or in respect of the Communication.
>


-- 
Claus Ibsen
-----------------
@davsclaus
Camel in Action 2: https://www.manning.com/ibsen2