You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Otavio Rodolfo Piske <an...@gmail.com> on 2022/11/02 18:00:03 UTC

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Hi Ivan,

For Kafka (consumer) objects, yes, they should be created for every route.
I want to try to take a look at it during this week or in the next.

In the meantime, can you please tell me which version of Camel you are
using?

Kind regards

On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com> wrote:

> Hi Oktavio thank you for your response.
>
> The commit is made synchronously by the kafkaOffsetProcessor (whose code I
> forgot to attach, more details here
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception).
> Basically it does what the documentation says for synchronous commits.
>
> Multiple threads cannot access the same kafka client that’s it, but camel
> handles instances and threads, so not so easy to fix for me.
>
> I think about seda because I know that for seda endpoint object are pooled
> (if my understanding is right).
>
> Are you sure that new objects are created for every route created from a
> template ?
>
> Ivan Rododendro
>
> > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <an...@gmail.com> a
> écrit :
> >
> > Hi,
> >
> > From the code you provided, it's not very clear to me when and where you
> > are calling the commit. Also it's not very clear to me: which version of
> > Camel you are using and which kind of commit factory you are using (async
> > [1] or sync [2]?).
> >
> > That said ...The problem here is that - as explained in the exception
> > message - the Kafka client cannot be accessed from a different thread.
> >
> > So, I am not entirely sure that the problem is related to seda or
> something
> > like that. Also, Camel will indeed, create a different consumer for every
> > route.
> >
> > Please, can you provide a bit more details about the code you have?
> >
> > 1.
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > 2.
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> >
> >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> ivan.rododendro@gmail.com>
> >> wrote:
> >>
> >> Hello
> >> I'm really new to Camel concepts, our need is to create some identical
> >> routes, identical except for some parameters, from a Kafka topic to a
> http
> >> endpoint, with some processing in-between.
> >>
> >> Besides this we want to explicitly commit the message consumption only
> when
> >> the http endpoint has been successfully called.
> >>
> >> In order to achieve this we set up a route template that carries the
> Route
> >> parameterization and set it up to manually commit after having called
> the
> >> http endpoint :
> >> public void configure() throws Exception {
> >>        // @formatter:off
> >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> >>            .templateParameter(Constantes.JOB_NAME)
> >>            .templateParameter(Constantes.TOPIC)
> >>            .templateParameter(Constantes.PUBLISHER_ID)
> >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> >>            .from(getKafkaEndpoint())
> >>            .messageHistory()
> >>            .filter(simple("${header.publisherId} == '{{publisherId}}'"))
> >>            .process(messageLoggerProcessor)
> >>            .process(modelMapperProcessor)
> >>            .process(jsonlToArrayProcessor)
> >>            .process(payloadProcessor)
> >>
> >> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> >>            .setHeader(Exchange.CONTENT_TYPE,
> >> constant("application/json;charset=UTF-8"))
> >>            .setHeader(Constantes.ACCEPT,constant("application/json"))
> >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> >>
> >>
> >>
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> >>            .process(apiConsumerProcessorLogger)
> >>            .to(this.url)
> >>            .process(kafkaOffsetProcessor);
> >>        // @formatter:on
> >>    }
> >>
> >>    private String getKafkaEndpoint() {
> >>        String endpoint =
> >>
> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" +
> >> kafkaBrokers;
> >>
> >>        if (securityEnabled()) {
> >>            endpoint += "&securityProtocol=SASL_SSL" +
> >> "&saslMechanism=PLAIN"
> >>                    +
> >> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> >> required username=\""
> >>                    + username + "\" password=\"" + password + "\";" +
> >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> >>                    + "&sslTruststorePassword=" + sslTruststorePassword;
> >>        }
> >>
> >>        return endpoint;
> >>    }
> >>
> >> The problem is that we systematically get this error when a message is
> >> consumed by a route :
> >>
> >> Trace: java.util.ConcurrentModificationException: KafkaConsumer is not
> >> safe for multi-threaded access
> >>    at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> >>    at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> >>    at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> >>    at
> >>
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> >>    at
> >>
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> >>
> >>
> >> My understanding is that the instance of KafkaConsumer is reused in
> >> multiple routes and therefore it generates the error, but it could be
> also
> >> related to using SEDA endpoint as stated here (
> >> https://issues.apache.org/jira/browse/CAMEL-12722), which we don't
> >> explicitly do.
> >>
> >> We tried injecting a KafkaComponent local bean in the route :
> >>
> >>
> >>
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> >>            .end()
> >>
> >>
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> >> "#{{myKafkaConfiguration}}")
> >>            .end()
> >>            .from("#{{myKafka}}")
> >>
> >> But it ends up with another error because you cannot consume a Bean
> >> endpoint (
> https://camel.apache.org/components/3.18.x/bean-component.html)
> >>
> >> How to use a different KafkaConsumer for every created route ? Or, if
> the
> >> issue is SEDA related, how to make this route a direct route?
> >>
> >> Thank you for your help
> >>
> >
> >
> > --
> > Otavio R. Piske
> > http://orpiske.net
>


-- 
Otavio R. Piske
http://orpiske.net

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Posted by Otavio Rodolfo Piske <an...@gmail.com>.
Thanks for the reproducer, this makes it much easier to understand. And
thanks Claus for pointing out the async nature of the resequencer.

I am going to add a note on the documentation about the limitations of
using manual commit along with operations that may run on separate threads.

Kind regards

On Wed, Nov 23, 2022 at 10:35 PM Ivan <iv...@gmail.com> wrote:

> Ok thanks.
>
> Ivan Rododendro
>
> > Le 23 nov. 2022 à 18:01, Claus Ibsen <cl...@gmail.com> a écrit :
> >
> > Hi
> >
> > The resequencer EIP is async, so its output is processed by another
> thread,
> > which manual commit from kafka is not supported as you must do that on
> the
> > same thread that received the message.
> >
> >> On Wed, Nov 23, 2022 at 3:34 PM Ivan Rododendro <
> ivan.rododendro@gmail.com>
> >> wrote:
> >>
> >> Fair enough.
> >>
> >> I pushed a reproducer here based on testcontainer ...  *but it don't
> >> reproduces the issue *:
> >> https://github.com/ivanrododendro/reprex-camelmanualcommit.git
> >>
> >> It turns out that my issue is caused by a header based resequencer in
> >> the route, basically having this in the route :
> >> .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100)
> >> produces this :
> >> CaughtExceptionType: java.util.ConcurrentModificationException
> >> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> >>
> >> The resequencer is in the reproducer too but it does interfere with
> kafka
> >> manual commit.
> >>
> >> Nevertheless I'd like to understand  the following logs from the
> reproducer
> >> :
> >> INFO  route-1.log - Message received
> >> INFO  route-2.log - Message received
> >> INFO  o.apache.camel.builder.RouteBuilder.process - Processing message
> from
> >> route [route-1]
> >> INFO  o.i.r.c.TemplateBuilder.process - Committed Kafka offset from
> route
> >> [route-2]
> >>
> >> Two routes from the same template, with a filter based on message
> headers.
> >> Route 1 processes the message, route 2 discards it but it triggers
> >> .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) ..
> >> it looks weird to me.
> >>
> >> Regards
> >>
> >>
> >>
> >> On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <
> angusyoung@gmail.com
> >>>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thanks. I have to be honest with you: I truly want to look more closely
> >> at
> >>> this one, but it's been a bit hard to try to make sense of the code you
> >>> provided so far. It's not something I can quickly modify one of our
> unit
> >>> tests and run.
> >>>
> >>> In this case ...
> >>>
> >>> Please, can you provide a full reproducer and send the code? Please put
> >> it
> >>> on Github, so I can clone and reproduce and debug? That would make
> >>> investigating and fixing this much easier and quicker for me.
> >>>
> >>> Kind regards
> >>>
> >>> On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <
> >>> ivan.rododendro@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Octavio
> >>>> I've been  busy ...
> >>>>
> >>>> I upgraded to Came 3.18.3 :
> >>>> 22-11-2022 12:35:10.829 [restartedMain] INFO
> >>>> o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel
> >> 3.18.3
> >>>> (camel-1) is starting
> >>>>
> >>>> Still I have the error :
> >>>> CaughtExceptionType: java.util.ConcurrentModificationException
> >>>> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> >>>> access  StackTrace: java.util.ConcurrentModificationException:
> >>>> KafkaConsumer is not safe for multi-threaded access
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
> >>>> at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
> >>>>
> >>>> Ivan
> >>>>
> >>>> On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <
> >>> angusyoung@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi, thanks
> >>>>>
> >>>>> Please, can you try with Camel 3.18.3? Along with the rest of the
> >>>>> community, we introduced a lot of fixes on the Kafka component on
> >>> 3.18.2
> >>>>> and 3.18.3. Maybe it will help you solve the problem (and, at the
> >> same
> >>>>> time, it's an LTS version, so we can fix it if there's a problem).
> >>>>>
> >>>>> Thanks in advance
> >>>>>
> >>>>> On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
> >>>> ivan.rododendro@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Otavio,
> >>>>>> Camel version is 3.17.0
> >>>>>>
> >>>>>> thank you
> >>>>>>
> >>>>>> On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
> >>>>> angusyoung@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Ivan,
> >>>>>>>
> >>>>>>> For Kafka (consumer) objects, yes, they should be created for
> >> every
> >>>>>> route.
> >>>>>>> I want to try to take a look at it during this week or in the
> >> next.
> >>>>>>>
> >>>>>>> In the meantime, can you please tell me which version of Camel
> >> you
> >>>> are
> >>>>>>> using?
> >>>>>>>
> >>>>>>> Kind regards
> >>>>>>>
> >>>>>>> On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Oktavio thank you for your response.
> >>>>>>>>
> >>>>>>>> The commit is made synchronously by the kafkaOffsetProcessor
> >>> (whose
> >>>>>> code
> >>>>>>> I
> >>>>>>>> forgot to attach, more details here
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> >>>>>>> ).
> >>>>>>>> Basically it does what the documentation says for synchronous
> >>>>> commits.
> >>>>>>>>
> >>>>>>>> Multiple threads cannot access the same kafka client that’s it,
> >>> but
> >>>>>> camel
> >>>>>>>> handles instances and threads, so not so easy to fix for me.
> >>>>>>>>
> >>>>>>>> I think about seda because I know that for seda endpoint object
> >>> are
> >>>>>>> pooled
> >>>>>>>> (if my understanding is right).
> >>>>>>>>
> >>>>>>>> Are you sure that new objects are created for every route
> >> created
> >>>>> from
> >>>>>> a
> >>>>>>>> template ?
> >>>>>>>>
> >>>>>>>> Ivan Rododendro
> >>>>>>>>
> >>>>>>>>> Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
> >>>>> angusyoung@gmail.com>
> >>>>>> a
> >>>>>>>> écrit :
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> From the code you provided, it's not very clear to me when
> >> and
> >>>>> where
> >>>>>>> you
> >>>>>>>>> are calling the commit. Also it's not very clear to me: which
> >>>>> version
> >>>>>>> of
> >>>>>>>>> Camel you are using and which kind of commit factory you are
> >>>> using
> >>>>>>> (async
> >>>>>>>>> [1] or sync [2]?).
> >>>>>>>>>
> >>>>>>>>> That said ...The problem here is that - as explained in the
> >>>>> exception
> >>>>>>>>> message - the Kafka client cannot be accessed from a
> >> different
> >>>>>> thread.
> >>>>>>>>>
> >>>>>>>>> So, I am not entirely sure that the problem is related to
> >> seda
> >>> or
> >>>>>>>> something
> >>>>>>>>> like that. Also, Camel will indeed, create a different
> >> consumer
> >>>> for
> >>>>>>> every
> >>>>>>>>> route.
> >>>>>>>>>
> >>>>>>>>> Please, can you provide a bit more details about the code you
> >>>> have?
> >>>>>>>>>
> >>>>>>>>> 1.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> >>>>>>>>> 2.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> >>>>>>>>>
> >>>>>>>>>> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> >>>>>>>> ivan.rododendro@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hello
> >>>>>>>>>> I'm really new to Camel concepts, our need is to create some
> >>>>>> identical
> >>>>>>>>>> routes, identical except for some parameters, from a Kafka
> >>> topic
> >>>>> to
> >>>>>> a
> >>>>>>>> http
> >>>>>>>>>> endpoint, with some processing in-between.
> >>>>>>>>>>
> >>>>>>>>>> Besides this we want to explicitly commit the message
> >>>> consumption
> >>>>>> only
> >>>>>>>> when
> >>>>>>>>>> the http endpoint has been successfully called.
> >>>>>>>>>>
> >>>>>>>>>> In order to achieve this we set up a route template that
> >>> carries
> >>>>> the
> >>>>>>>> Route
> >>>>>>>>>> parameterization and set it up to manually commit after
> >> having
> >>>>>> called
> >>>>>>>> the
> >>>>>>>>>> http endpoint :
> >>>>>>>>>> public void configure() throws Exception {
> >>>>>>>>>>       // @formatter:off
> >>>>>>>>>>       routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> >>>>>>>>>>           .templateParameter(Constantes.JOB_NAME)
> >>>>>>>>>>           .templateParameter(Constantes.TOPIC)
> >>>>>>>>>>           .templateParameter(Constantes.PUBLISHER_ID)
> >>>>>>>>>>
> >>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> >>>>>>>>>>           .templateParameter(Constantes.JOB_NAME_PARAMETER)
> >>>>>>>>>>
> >>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> >>>>>>>>>>           .from(getKafkaEndpoint())
> >>>>>>>>>>           .messageHistory()
> >>>>>>>>>>           .filter(simple("${header.publisherId} ==
> >>>>>>> '{{publisherId}}'"))
> >>>>>>>>>>           .process(messageLoggerProcessor)
> >>>>>>>>>>           .process(modelMapperProcessor)
> >>>>>>>>>>           .process(jsonlToArrayProcessor)
> >>>>>>>>>>           .process(payloadProcessor)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> >>>>>>>>>>           .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> >>>>>>>>>>           .setHeader(Exchange.CONTENT_TYPE,
> >>>>>>>>>> constant("application/json;charset=UTF-8"))
> >>>>>>>>>>
> >>>>>> .setHeader(Constantes.ACCEPT,constant("application/json"))
> >>>>>>>>>>           .setHeader(Constantes.API_KEY, constant(apiKey))
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> >>>>>>>>>>           .process(apiConsumerProcessorLogger)
> >>>>>>>>>>           .to(this.url)
> >>>>>>>>>>           .process(kafkaOffsetProcessor);
> >>>>>>>>>>       // @formatter:on
> >>>>>>>>>>   }
> >>>>>>>>>>
> >>>>>>>>>>   private String getKafkaEndpoint() {
> >>>>>>>>>>       String endpoint =
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> >>>>>>> +
> >>>>>>>>>> kafkaBrokers;
> >>>>>>>>>>
> >>>>>>>>>>       if (securityEnabled()) {
> >>>>>>>>>>           endpoint += "&securityProtocol=SASL_SSL" +
> >>>>>>>>>> "&saslMechanism=PLAIN"
> >>>>>>>>>>                   +
> >>>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> >>>>>>>>>> required username=\""
> >>>>>>>>>>                   + username + "\" password=\"" + password
> >> +
> >>>>> "\";"
> >>>>>> +
> >>>>>>>>>> "&sslTruststoreLocation=" + sslTrustStoreLocation
> >>>>>>>>>>                   + "&sslTruststorePassword=" +
> >>>>>>> sslTruststorePassword;
> >>>>>>>>>>       }
> >>>>>>>>>>
> >>>>>>>>>>       return endpoint;
> >>>>>>>>>>   }
> >>>>>>>>>>
> >>>>>>>>>> The problem is that we systematically get this error when a
> >>>>> message
> >>>>>> is
> >>>>>>>>>> consumed by a route :
> >>>>>>>>>>
> >>>>>>>>>> Trace: java.util.ConcurrentModificationException:
> >>> KafkaConsumer
> >>>> is
> >>>>>> not
> >>>>>>>>>> safe for multi-threaded access
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> My understanding is that the instance of KafkaConsumer is
> >>> reused
> >>>>> in
> >>>>>>>>>> multiple routes and therefore it generates the error, but it
> >>>> could
> >>>>>> be
> >>>>>>>> also
> >>>>>>>>>> related to using SEDA endpoint as stated here (
> >>>>>>>>>> https://issues.apache.org/jira/browse/CAMEL-12722), which
> >> we
> >>>>> don't
> >>>>>>>>>> explicitly do.
> >>>>>>>>>>
> >>>>>>>>>> We tried injecting a KafkaComponent local bean in the route
> >> :
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> >>>>>>>>>> "{{" + Constantes.TOPIC
> >> +"}}").properties(kafkaConfiguration)
> >>>>>>>>>>           .end()
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> >>>>>>>>>> "#{{myKafkaConfiguration}}")
> >>>>>>>>>>           .end()
> >>>>>>>>>>           .from("#{{myKafka}}")
> >>>>>>>>>>
> >>>>>>>>>> But it ends up with another error because you cannot
> >> consume a
> >>>>> Bean
> >>>>>>>>>> endpoint (
> >>>>>>>> https://camel.apache.org/components/3.18.x/bean-component.html
> >> )
> >>>>>>>>>>
> >>>>>>>>>> How to use a different KafkaConsumer for every created
> >> route ?
> >>>> Or,
> >>>>>> if
> >>>>>>>> the
> >>>>>>>>>> issue is SEDA related, how to make this route a direct
> >> route?
> >>>>>>>>>>
> >>>>>>>>>> Thank you for your help
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Otavio R. Piske
> >>>>>>>>> http://orpiske.net
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Otavio R. Piske
> >>>>>>> http://orpiske.net
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Otavio R. Piske
> >>>>> http://orpiske.net
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Otavio R. Piske
> >>> http://orpiske.net
> >>>
> >>
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > @davsclaus
> > Camel in Action 2: https://www.manning.com/ibsen2
>


-- 
Otavio R. Piske
http://orpiske.net

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Posted by Ivan <iv...@gmail.com>.
Ok thanks. 

Ivan Rododendro

> Le 23 nov. 2022 à 18:01, Claus Ibsen <cl...@gmail.com> a écrit :
> 
> Hi
> 
> The resequencer EIP is async, so its output is processed by another thread,
> which manual commit from kafka is not supported as you must do that on the
> same thread that received the message.
> 
>> On Wed, Nov 23, 2022 at 3:34 PM Ivan Rododendro <iv...@gmail.com>
>> wrote:
>> 
>> Fair enough.
>> 
>> I pushed a reproducer here based on testcontainer ...  *but it don't
>> reproduces the issue *:
>> https://github.com/ivanrododendro/reprex-camelmanualcommit.git
>> 
>> It turns out that my issue is caused by a header based resequencer in
>> the route, basically having this in the route :
>> .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100)
>> produces this :
>> CaughtExceptionType: java.util.ConcurrentModificationException
>> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
>> 
>> The resequencer is in the reproducer too but it does interfere with kafka
>> manual commit.
>> 
>> Nevertheless I'd like to understand  the following logs from the reproducer
>> :
>> INFO  route-1.log - Message received
>> INFO  route-2.log - Message received
>> INFO  o.apache.camel.builder.RouteBuilder.process - Processing message from
>> route [route-1]
>> INFO  o.i.r.c.TemplateBuilder.process - Committed Kafka offset from route
>> [route-2]
>> 
>> Two routes from the same template, with a filter based on message headers.
>> Route 1 processes the message, route 2 discards it but it triggers
>> .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) ..
>> it looks weird to me.
>> 
>> Regards
>> 
>> 
>> 
>> On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <angusyoung@gmail.com
>>> 
>> wrote:
>> 
>>> Hi,
>>> 
>>> Thanks. I have to be honest with you: I truly want to look more closely
>> at
>>> this one, but it's been a bit hard to try to make sense of the code you
>>> provided so far. It's not something I can quickly modify one of our unit
>>> tests and run.
>>> 
>>> In this case ...
>>> 
>>> Please, can you provide a full reproducer and send the code? Please put
>> it
>>> on Github, so I can clone and reproduce and debug? That would make
>>> investigating and fixing this much easier and quicker for me.
>>> 
>>> Kind regards
>>> 
>>> On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <
>>> ivan.rododendro@gmail.com>
>>> wrote:
>>> 
>>>> Hi Octavio
>>>> I've been  busy ...
>>>> 
>>>> I upgraded to Came 3.18.3 :
>>>> 22-11-2022 12:35:10.829 [restartedMain] INFO
>>>> o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel
>> 3.18.3
>>>> (camel-1) is starting
>>>> 
>>>> Still I have the error :
>>>> CaughtExceptionType: java.util.ConcurrentModificationException
>>>> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
>>>> access  StackTrace: java.util.ConcurrentModificationException:
>>>> KafkaConsumer is not safe for multi-threaded access
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
>>>> at
>>>> 
>>>> 
>>> 
>> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
>>>> at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
>>>> 
>>>> Ivan
>>>> 
>>>> On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <
>>> angusyoung@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi, thanks
>>>>> 
>>>>> Please, can you try with Camel 3.18.3? Along with the rest of the
>>>>> community, we introduced a lot of fixes on the Kafka component on
>>> 3.18.2
>>>>> and 3.18.3. Maybe it will help you solve the problem (and, at the
>> same
>>>>> time, it's an LTS version, so we can fix it if there's a problem).
>>>>> 
>>>>> Thanks in advance
>>>>> 
>>>>> On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
>>>> ivan.rododendro@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Otavio,
>>>>>> Camel version is 3.17.0
>>>>>> 
>>>>>> thank you
>>>>>> 
>>>>>> On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
>>>>> angusyoung@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Ivan,
>>>>>>> 
>>>>>>> For Kafka (consumer) objects, yes, they should be created for
>> every
>>>>>> route.
>>>>>>> I want to try to take a look at it during this week or in the
>> next.
>>>>>>> 
>>>>>>> In the meantime, can you please tell me which version of Camel
>> you
>>>> are
>>>>>>> using?
>>>>>>> 
>>>>>>> Kind regards
>>>>>>> 
>>>>>>> On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Oktavio thank you for your response.
>>>>>>>> 
>>>>>>>> The commit is made synchronously by the kafkaOffsetProcessor
>>> (whose
>>>>>> code
>>>>>>> I
>>>>>>>> forgot to attach, more details here
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
>>>>>>> ).
>>>>>>>> Basically it does what the documentation says for synchronous
>>>>> commits.
>>>>>>>> 
>>>>>>>> Multiple threads cannot access the same kafka client that’s it,
>>> but
>>>>>> camel
>>>>>>>> handles instances and threads, so not so easy to fix for me.
>>>>>>>> 
>>>>>>>> I think about seda because I know that for seda endpoint object
>>> are
>>>>>>> pooled
>>>>>>>> (if my understanding is right).
>>>>>>>> 
>>>>>>>> Are you sure that new objects are created for every route
>> created
>>>>> from
>>>>>> a
>>>>>>>> template ?
>>>>>>>> 
>>>>>>>> Ivan Rododendro
>>>>>>>> 
>>>>>>>>> Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
>>>>> angusyoung@gmail.com>
>>>>>> a
>>>>>>>> écrit :
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> From the code you provided, it's not very clear to me when
>> and
>>>>> where
>>>>>>> you
>>>>>>>>> are calling the commit. Also it's not very clear to me: which
>>>>> version
>>>>>>> of
>>>>>>>>> Camel you are using and which kind of commit factory you are
>>>> using
>>>>>>> (async
>>>>>>>>> [1] or sync [2]?).
>>>>>>>>> 
>>>>>>>>> That said ...The problem here is that - as explained in the
>>>>> exception
>>>>>>>>> message - the Kafka client cannot be accessed from a
>> different
>>>>>> thread.
>>>>>>>>> 
>>>>>>>>> So, I am not entirely sure that the problem is related to
>> seda
>>> or
>>>>>>>> something
>>>>>>>>> like that. Also, Camel will indeed, create a different
>> consumer
>>>> for
>>>>>>> every
>>>>>>>>> route.
>>>>>>>>> 
>>>>>>>>> Please, can you provide a bit more details about the code you
>>>> have?
>>>>>>>>> 
>>>>>>>>> 1.
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
>>>>>>>>> 2.
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
>>>>>>>>> 
>>>>>>>>>> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
>>>>>>>> ivan.rododendro@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hello
>>>>>>>>>> I'm really new to Camel concepts, our need is to create some
>>>>>> identical
>>>>>>>>>> routes, identical except for some parameters, from a Kafka
>>> topic
>>>>> to
>>>>>> a
>>>>>>>> http
>>>>>>>>>> endpoint, with some processing in-between.
>>>>>>>>>> 
>>>>>>>>>> Besides this we want to explicitly commit the message
>>>> consumption
>>>>>> only
>>>>>>>> when
>>>>>>>>>> the http endpoint has been successfully called.
>>>>>>>>>> 
>>>>>>>>>> In order to achieve this we set up a route template that
>>> carries
>>>>> the
>>>>>>>> Route
>>>>>>>>>> parameterization and set it up to manually commit after
>> having
>>>>>> called
>>>>>>>> the
>>>>>>>>>> http endpoint :
>>>>>>>>>> public void configure() throws Exception {
>>>>>>>>>>       // @formatter:off
>>>>>>>>>>       routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
>>>>>>>>>>           .templateParameter(Constantes.JOB_NAME)
>>>>>>>>>>           .templateParameter(Constantes.TOPIC)
>>>>>>>>>>           .templateParameter(Constantes.PUBLISHER_ID)
>>>>>>>>>> 
>>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
>>>>>>>>>>           .templateParameter(Constantes.JOB_NAME_PARAMETER)
>>>>>>>>>> 
>>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
>>>>>>>>>>           .from(getKafkaEndpoint())
>>>>>>>>>>           .messageHistory()
>>>>>>>>>>           .filter(simple("${header.publisherId} ==
>>>>>>> '{{publisherId}}'"))
>>>>>>>>>>           .process(messageLoggerProcessor)
>>>>>>>>>>           .process(modelMapperProcessor)
>>>>>>>>>>           .process(jsonlToArrayProcessor)
>>>>>>>>>>           .process(payloadProcessor)
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>> 
>>> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
>>>>>>>>>>           .setHeader(Exchange.HTTP_METHOD, simple("POST"))
>>>>>>>>>>           .setHeader(Exchange.CONTENT_TYPE,
>>>>>>>>>> constant("application/json;charset=UTF-8"))
>>>>>>>>>> 
>>>>>> .setHeader(Constantes.ACCEPT,constant("application/json"))
>>>>>>>>>>           .setHeader(Constantes.API_KEY, constant(apiKey))
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
>>>>>>>>>>           .process(apiConsumerProcessorLogger)
>>>>>>>>>>           .to(this.url)
>>>>>>>>>>           .process(kafkaOffsetProcessor);
>>>>>>>>>>       // @formatter:on
>>>>>>>>>>   }
>>>>>>>>>> 
>>>>>>>>>>   private String getKafkaEndpoint() {
>>>>>>>>>>       String endpoint =
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
>>>>>>> +
>>>>>>>>>> kafkaBrokers;
>>>>>>>>>> 
>>>>>>>>>>       if (securityEnabled()) {
>>>>>>>>>>           endpoint += "&securityProtocol=SASL_SSL" +
>>>>>>>>>> "&saslMechanism=PLAIN"
>>>>>>>>>>                   +
>>>>>>>>>> 
>>>>>>> 
>>>>> 
>>> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
>>>>>>>>>> required username=\""
>>>>>>>>>>                   + username + "\" password=\"" + password
>> +
>>>>> "\";"
>>>>>> +
>>>>>>>>>> "&sslTruststoreLocation=" + sslTrustStoreLocation
>>>>>>>>>>                   + "&sslTruststorePassword=" +
>>>>>>> sslTruststorePassword;
>>>>>>>>>>       }
>>>>>>>>>> 
>>>>>>>>>>       return endpoint;
>>>>>>>>>>   }
>>>>>>>>>> 
>>>>>>>>>> The problem is that we systematically get this error when a
>>>>> message
>>>>>> is
>>>>>>>>>> consumed by a route :
>>>>>>>>>> 
>>>>>>>>>> Trace: java.util.ConcurrentModificationException:
>>> KafkaConsumer
>>>> is
>>>>>> not
>>>>>>>>>> safe for multi-threaded access
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> My understanding is that the instance of KafkaConsumer is
>>> reused
>>>>> in
>>>>>>>>>> multiple routes and therefore it generates the error, but it
>>>> could
>>>>>> be
>>>>>>>> also
>>>>>>>>>> related to using SEDA endpoint as stated here (
>>>>>>>>>> https://issues.apache.org/jira/browse/CAMEL-12722), which
>> we
>>>>> don't
>>>>>>>>>> explicitly do.
>>>>>>>>>> 
>>>>>>>>>> We tried injecting a KafkaComponent local bean in the route
>> :
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
>>>>>>>>>> "{{" + Constantes.TOPIC
>> +"}}").properties(kafkaConfiguration)
>>>>>>>>>>           .end()
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
>>>>>>>>>> "#{{myKafkaConfiguration}}")
>>>>>>>>>>           .end()
>>>>>>>>>>           .from("#{{myKafka}}")
>>>>>>>>>> 
>>>>>>>>>> But it ends up with another error because you cannot
>> consume a
>>>>> Bean
>>>>>>>>>> endpoint (
>>>>>>>> https://camel.apache.org/components/3.18.x/bean-component.html
>> )
>>>>>>>>>> 
>>>>>>>>>> How to use a different KafkaConsumer for every created
>> route ?
>>>> Or,
>>>>>> if
>>>>>>>> the
>>>>>>>>>> issue is SEDA related, how to make this route a direct
>> route?
>>>>>>>>>> 
>>>>>>>>>> Thank you for your help
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Otavio R. Piske
>>>>>>>>> http://orpiske.net
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Otavio R. Piske
>>>>>>> http://orpiske.net
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Otavio R. Piske
>>>>> http://orpiske.net
>>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> Otavio R. Piske
>>> http://orpiske.net
>>> 
>> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

The resequencer EIP is async, so its output is processed by another thread,
which manual commit from kafka is not supported as you must do that on the
same thread that received the message.

On Wed, Nov 23, 2022 at 3:34 PM Ivan Rododendro <iv...@gmail.com>
wrote:

> Fair enough.
>
> I pushed a reproducer here based on testcontainer ...  *but it don't
> reproduces the issue *:
> https://github.com/ivanrododendro/reprex-camelmanualcommit.git
>
> It turns out that my issue is caused by a header based resequencer in
> the route, basically having this in the route :
>  .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100)
> produces this :
> CaughtExceptionType: java.util.ConcurrentModificationException
>  CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
>
> The resequencer is in the reproducer too but it does interfere with kafka
> manual commit.
>
> Nevertheless I'd like to understand  the following logs from the reproducer
> :
> INFO  route-1.log - Message received
> INFO  route-2.log - Message received
> INFO  o.apache.camel.builder.RouteBuilder.process - Processing message from
> route [route-1]
> INFO  o.i.r.c.TemplateBuilder.process - Committed Kafka offset from route
> [route-2]
>
> Two routes from the same template, with a filter based on message headers.
> Route 1 processes the message, route 2 discards it but it triggers
> .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) ..
> it looks weird to me.
>
> Regards
>
>
>
> On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <angusyoung@gmail.com
> >
> wrote:
>
> > Hi,
> >
> > Thanks. I have to be honest with you: I truly want to look more closely
> at
> > this one, but it's been a bit hard to try to make sense of the code you
> > provided so far. It's not something I can quickly modify one of our unit
> > tests and run.
> >
> > In this case ...
> >
> > Please, can you provide a full reproducer and send the code? Please put
> it
> > on Github, so I can clone and reproduce and debug? That would make
> > investigating and fixing this much easier and quicker for me.
> >
> > Kind regards
> >
> > On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <
> > ivan.rododendro@gmail.com>
> > wrote:
> >
> > > Hi Octavio
> > > I've been  busy ...
> > >
> > > I upgraded to Came 3.18.3 :
> > > 22-11-2022 12:35:10.829 [restartedMain] INFO
> > >  o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel
> 3.18.3
> > > (camel-1) is starting
> > >
> > > Still I have the error :
> > > CaughtExceptionType: java.util.ConcurrentModificationException
> > >  CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> > > access  StackTrace: java.util.ConcurrentModificationException:
> > > KafkaConsumer is not safe for multi-threaded access
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
> > > at
> > >
> > >
> >
> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
> > > at
> > >
> > >
> >
> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
> > > at
> > >
> > >
> >
> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
> > > at
> > >
> > >
> >
> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
> > > at
> > >
> > >
> >
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
> > > at
> > >
> > >
> >
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
> > > at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
> > > at
> > >
> > >
> >
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
> > >
> > > Ivan
> > >
> > > On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <
> > angusyoung@gmail.com>
> > > wrote:
> > >
> > > > Hi, thanks
> > > >
> > > > Please, can you try with Camel 3.18.3? Along with the rest of the
> > > > community, we introduced a lot of fixes on the Kafka component on
> > 3.18.2
> > > > and 3.18.3. Maybe it will help you solve the problem (and, at the
> same
> > > > time, it's an LTS version, so we can fix it if there's a problem).
> > > >
> > > > Thanks in advance
> > > >
> > > > On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
> > > ivan.rododendro@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Otavio,
> > > > > Camel version is 3.17.0
> > > > >
> > > > > thank you
> > > > >
> > > > > On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
> > > > angusyoung@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Ivan,
> > > > > >
> > > > > > For Kafka (consumer) objects, yes, they should be created for
> every
> > > > > route.
> > > > > > I want to try to take a look at it during this week or in the
> next.
> > > > > >
> > > > > > In the meantime, can you please tell me which version of Camel
> you
> > > are
> > > > > > using?
> > > > > >
> > > > > > Kind regards
> > > > > >
> > > > > > On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi Oktavio thank you for your response.
> > > > > > >
> > > > > > > The commit is made synchronously by the kafkaOffsetProcessor
> > (whose
> > > > > code
> > > > > > I
> > > > > > > forgot to attach, more details here
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> > > > > > ).
> > > > > > > Basically it does what the documentation says for synchronous
> > > > commits.
> > > > > > >
> > > > > > > Multiple threads cannot access the same kafka client that’s it,
> > but
> > > > > camel
> > > > > > > handles instances and threads, so not so easy to fix for me.
> > > > > > >
> > > > > > > I think about seda because I know that for seda endpoint object
> > are
> > > > > > pooled
> > > > > > > (if my understanding is right).
> > > > > > >
> > > > > > > Are you sure that new objects are created for every route
> created
> > > > from
> > > > > a
> > > > > > > template ?
> > > > > > >
> > > > > > > Ivan Rododendro
> > > > > > >
> > > > > > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
> > > > angusyoung@gmail.com>
> > > > > a
> > > > > > > écrit :
> > > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > From the code you provided, it's not very clear to me when
> and
> > > > where
> > > > > > you
> > > > > > > > are calling the commit. Also it's not very clear to me: which
> > > > version
> > > > > > of
> > > > > > > > Camel you are using and which kind of commit factory you are
> > > using
> > > > > > (async
> > > > > > > > [1] or sync [2]?).
> > > > > > > >
> > > > > > > > That said ...The problem here is that - as explained in the
> > > > exception
> > > > > > > > message - the Kafka client cannot be accessed from a
> different
> > > > > thread.
> > > > > > > >
> > > > > > > > So, I am not entirely sure that the problem is related to
> seda
> > or
> > > > > > > something
> > > > > > > > like that. Also, Camel will indeed, create a different
> consumer
> > > for
> > > > > > every
> > > > > > > > route.
> > > > > > > >
> > > > > > > > Please, can you provide a bit more details about the code you
> > > have?
> > > > > > > >
> > > > > > > > 1.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > > > > > > 2.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > > > > > > >
> > > > > > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > > > > > > ivan.rododendro@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> Hello
> > > > > > > >> I'm really new to Camel concepts, our need is to create some
> > > > > identical
> > > > > > > >> routes, identical except for some parameters, from a Kafka
> > topic
> > > > to
> > > > > a
> > > > > > > http
> > > > > > > >> endpoint, with some processing in-between.
> > > > > > > >>
> > > > > > > >> Besides this we want to explicitly commit the message
> > > consumption
> > > > > only
> > > > > > > when
> > > > > > > >> the http endpoint has been successfully called.
> > > > > > > >>
> > > > > > > >> In order to achieve this we set up a route template that
> > carries
> > > > the
> > > > > > > Route
> > > > > > > >> parameterization and set it up to manually commit after
> having
> > > > > called
> > > > > > > the
> > > > > > > >> http endpoint :
> > > > > > > >> public void configure() throws Exception {
> > > > > > > >>        // @formatter:off
> > > > > > > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > > > > > > >>            .templateParameter(Constantes.JOB_NAME)
> > > > > > > >>            .templateParameter(Constantes.TOPIC)
> > > > > > > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > > > > > > >>
> > > .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > > > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > > > > > > >>
> > > .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > > > >>            .from(getKafkaEndpoint())
> > > > > > > >>            .messageHistory()
> > > > > > > >>            .filter(simple("${header.publisherId} ==
> > > > > > '{{publisherId}}'"))
> > > > > > > >>            .process(messageLoggerProcessor)
> > > > > > > >>            .process(modelMapperProcessor)
> > > > > > > >>            .process(jsonlToArrayProcessor)
> > > > > > > >>            .process(payloadProcessor)
> > > > > > > >>
> > > > > > > >>
> > > > > >
> > > >
> > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > > > > > > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > > > > > > >>            .setHeader(Exchange.CONTENT_TYPE,
> > > > > > > >> constant("application/json;charset=UTF-8"))
> > > > > > > >>
> > > > > .setHeader(Constantes.ACCEPT,constant("application/json"))
> > > > > > > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > > > > > > >>            .process(apiConsumerProcessorLogger)
> > > > > > > >>            .to(this.url)
> > > > > > > >>            .process(kafkaOffsetProcessor);
> > > > > > > >>        // @formatter:on
> > > > > > > >>    }
> > > > > > > >>
> > > > > > > >>    private String getKafkaEndpoint() {
> > > > > > > >>        String endpoint =
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> > > > > > +
> > > > > > > >> kafkaBrokers;
> > > > > > > >>
> > > > > > > >>        if (securityEnabled()) {
> > > > > > > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > > > > > > >> "&saslMechanism=PLAIN"
> > > > > > > >>                    +
> > > > > > > >>
> > > > > >
> > > >
> > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > > > > > > >> required username=\""
> > > > > > > >>                    + username + "\" password=\"" + password
> +
> > > > "\";"
> > > > > +
> > > > > > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > > > > > > >>                    + "&sslTruststorePassword=" +
> > > > > > sslTruststorePassword;
> > > > > > > >>        }
> > > > > > > >>
> > > > > > > >>        return endpoint;
> > > > > > > >>    }
> > > > > > > >>
> > > > > > > >> The problem is that we systematically get this error when a
> > > > message
> > > > > is
> > > > > > > >> consumed by a route :
> > > > > > > >>
> > > > > > > >> Trace: java.util.ConcurrentModificationException:
> > KafkaConsumer
> > > is
> > > > > not
> > > > > > > >> safe for multi-threaded access
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> My understanding is that the instance of KafkaConsumer is
> > reused
> > > > in
> > > > > > > >> multiple routes and therefore it generates the error, but it
> > > could
> > > > > be
> > > > > > > also
> > > > > > > >> related to using SEDA endpoint as stated here (
> > > > > > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which
> we
> > > > don't
> > > > > > > >> explicitly do.
> > > > > > > >>
> > > > > > > >> We tried injecting a KafkaComponent local bean in the route
> :
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > > > > > > >> "{{" + Constantes.TOPIC
> +"}}").properties(kafkaConfiguration)
> > > > > > > >>            .end()
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > > > > > > >> "#{{myKafkaConfiguration}}")
> > > > > > > >>            .end()
> > > > > > > >>            .from("#{{myKafka}}")
> > > > > > > >>
> > > > > > > >> But it ends up with another error because you cannot
> consume a
> > > > Bean
> > > > > > > >> endpoint (
> > > > > > > https://camel.apache.org/components/3.18.x/bean-component.html
> )
> > > > > > > >>
> > > > > > > >> How to use a different KafkaConsumer for every created
> route ?
> > > Or,
> > > > > if
> > > > > > > the
> > > > > > > >> issue is SEDA related, how to make this route a direct
> route?
> > > > > > > >>
> > > > > > > >> Thank you for your help
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Otavio R. Piske
> > > > > > > > http://orpiske.net
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Otavio R. Piske
> > > > > > http://orpiske.net
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Otavio R. Piske
> > > > http://orpiske.net
> > > >
> > >
> >
> >
> > --
> > Otavio R. Piske
> > http://orpiske.net
> >
>


-- 
Claus Ibsen
-----------------
@davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Posted by Ivan Rododendro <iv...@gmail.com>.
Fair enough.

I pushed a reproducer here based on testcontainer ...  *but it don't
reproduces the issue *:
https://github.com/ivanrododendro/reprex-camelmanualcommit.git

It turns out that my issue is caused by a header based resequencer in
the route, basically having this in the route :
 .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100)
produces this :
CaughtExceptionType: java.util.ConcurrentModificationException
 CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded

The resequencer is in the reproducer too but it does interfere with kafka
manual commit.

Nevertheless I'd like to understand  the following logs from the reproducer
:
INFO  route-1.log - Message received
INFO  route-2.log - Message received
INFO  o.apache.camel.builder.RouteBuilder.process - Processing message from
route [route-1]
INFO  o.i.r.c.TemplateBuilder.process - Committed Kafka offset from route
[route-2]

Two routes from the same template, with a filter based on message headers.
Route 1 processes the message, route 2 discards it but it triggers
.onCompletion().onCompleteOnly() (which triggers Kafka manual commit) ..
it looks weird to me.

Regards



On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <an...@gmail.com>
wrote:

> Hi,
>
> Thanks. I have to be honest with you: I truly want to look more closely at
> this one, but it's been a bit hard to try to make sense of the code you
> provided so far. It's not something I can quickly modify one of our unit
> tests and run.
>
> In this case ...
>
> Please, can you provide a full reproducer and send the code? Please put it
> on Github, so I can clone and reproduce and debug? That would make
> investigating and fixing this much easier and quicker for me.
>
> Kind regards
>
> On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <
> ivan.rododendro@gmail.com>
> wrote:
>
> > Hi Octavio
> > I've been  busy ...
> >
> > I upgraded to Came 3.18.3 :
> > 22-11-2022 12:35:10.829 [restartedMain] INFO
> >  o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel 3.18.3
> > (camel-1) is starting
> >
> > Still I have the error :
> > CaughtExceptionType: java.util.ConcurrentModificationException
> >  CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> > access  StackTrace: java.util.ConcurrentModificationException:
> > KafkaConsumer is not safe for multi-threaded access
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
> > at
> >
> >
> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
> > at
> >
> >
> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
> > at
> >
> >
> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
> > at
> >
> >
> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
> > at
> >
> >
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
> > at
> >
> >
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
> > at
> >
> >
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
> > at
> >
> >
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
> > at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
> > at
> >
> >
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
> > at
> >
> >
> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
> > at
> >
> >
> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
> > at
> >
> >
> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
> >
> > Ivan
> >
> > On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <
> angusyoung@gmail.com>
> > wrote:
> >
> > > Hi, thanks
> > >
> > > Please, can you try with Camel 3.18.3? Along with the rest of the
> > > community, we introduced a lot of fixes on the Kafka component on
> 3.18.2
> > > and 3.18.3. Maybe it will help you solve the problem (and, at the same
> > > time, it's an LTS version, so we can fix it if there's a problem).
> > >
> > > Thanks in advance
> > >
> > > On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
> > ivan.rododendro@gmail.com>
> > > wrote:
> > >
> > > > Hi Otavio,
> > > > Camel version is 3.17.0
> > > >
> > > > thank you
> > > >
> > > > On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
> > > angusyoung@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Ivan,
> > > > >
> > > > > For Kafka (consumer) objects, yes, they should be created for every
> > > > route.
> > > > > I want to try to take a look at it during this week or in the next.
> > > > >
> > > > > In the meantime, can you please tell me which version of Camel you
> > are
> > > > > using?
> > > > >
> > > > > Kind regards
> > > > >
> > > > > On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Oktavio thank you for your response.
> > > > > >
> > > > > > The commit is made synchronously by the kafkaOffsetProcessor
> (whose
> > > > code
> > > > > I
> > > > > > forgot to attach, more details here
> > > > > >
> > > > >
> > > >
> > >
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> > > > > ).
> > > > > > Basically it does what the documentation says for synchronous
> > > commits.
> > > > > >
> > > > > > Multiple threads cannot access the same kafka client that’s it,
> but
> > > > camel
> > > > > > handles instances and threads, so not so easy to fix for me.
> > > > > >
> > > > > > I think about seda because I know that for seda endpoint object
> are
> > > > > pooled
> > > > > > (if my understanding is right).
> > > > > >
> > > > > > Are you sure that new objects are created for every route created
> > > from
> > > > a
> > > > > > template ?
> > > > > >
> > > > > > Ivan Rododendro
> > > > > >
> > > > > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
> > > angusyoung@gmail.com>
> > > > a
> > > > > > écrit :
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > From the code you provided, it's not very clear to me when and
> > > where
> > > > > you
> > > > > > > are calling the commit. Also it's not very clear to me: which
> > > version
> > > > > of
> > > > > > > Camel you are using and which kind of commit factory you are
> > using
> > > > > (async
> > > > > > > [1] or sync [2]?).
> > > > > > >
> > > > > > > That said ...The problem here is that - as explained in the
> > > exception
> > > > > > > message - the Kafka client cannot be accessed from a different
> > > > thread.
> > > > > > >
> > > > > > > So, I am not entirely sure that the problem is related to seda
> or
> > > > > > something
> > > > > > > like that. Also, Camel will indeed, create a different consumer
> > for
> > > > > every
> > > > > > > route.
> > > > > > >
> > > > > > > Please, can you provide a bit more details about the code you
> > have?
> > > > > > >
> > > > > > > 1.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > > > > > 2.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > > > > > >
> > > > > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > > > > > ivan.rododendro@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> Hello
> > > > > > >> I'm really new to Camel concepts, our need is to create some
> > > > identical
> > > > > > >> routes, identical except for some parameters, from a Kafka
> topic
> > > to
> > > > a
> > > > > > http
> > > > > > >> endpoint, with some processing in-between.
> > > > > > >>
> > > > > > >> Besides this we want to explicitly commit the message
> > consumption
> > > > only
> > > > > > when
> > > > > > >> the http endpoint has been successfully called.
> > > > > > >>
> > > > > > >> In order to achieve this we set up a route template that
> carries
> > > the
> > > > > > Route
> > > > > > >> parameterization and set it up to manually commit after having
> > > > called
> > > > > > the
> > > > > > >> http endpoint :
> > > > > > >> public void configure() throws Exception {
> > > > > > >>        // @formatter:off
> > > > > > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > > > > > >>            .templateParameter(Constantes.JOB_NAME)
> > > > > > >>            .templateParameter(Constantes.TOPIC)
> > > > > > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > > > > > >>
> > .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > > > > > >>
> > .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > > >>            .from(getKafkaEndpoint())
> > > > > > >>            .messageHistory()
> > > > > > >>            .filter(simple("${header.publisherId} ==
> > > > > '{{publisherId}}'"))
> > > > > > >>            .process(messageLoggerProcessor)
> > > > > > >>            .process(modelMapperProcessor)
> > > > > > >>            .process(jsonlToArrayProcessor)
> > > > > > >>            .process(payloadProcessor)
> > > > > > >>
> > > > > > >>
> > > > >
> > >
> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > > > > > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > > > > > >>            .setHeader(Exchange.CONTENT_TYPE,
> > > > > > >> constant("application/json;charset=UTF-8"))
> > > > > > >>
> > > > .setHeader(Constantes.ACCEPT,constant("application/json"))
> > > > > > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > > > > > >>            .process(apiConsumerProcessorLogger)
> > > > > > >>            .to(this.url)
> > > > > > >>            .process(kafkaOffsetProcessor);
> > > > > > >>        // @formatter:on
> > > > > > >>    }
> > > > > > >>
> > > > > > >>    private String getKafkaEndpoint() {
> > > > > > >>        String endpoint =
> > > > > > >>
> > > > > >
> > > >
> > "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> > > > > +
> > > > > > >> kafkaBrokers;
> > > > > > >>
> > > > > > >>        if (securityEnabled()) {
> > > > > > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > > > > > >> "&saslMechanism=PLAIN"
> > > > > > >>                    +
> > > > > > >>
> > > > >
> > >
> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > > > > > >> required username=\""
> > > > > > >>                    + username + "\" password=\"" + password +
> > > "\";"
> > > > +
> > > > > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > > > > > >>                    + "&sslTruststorePassword=" +
> > > > > sslTruststorePassword;
> > > > > > >>        }
> > > > > > >>
> > > > > > >>        return endpoint;
> > > > > > >>    }
> > > > > > >>
> > > > > > >> The problem is that we systematically get this error when a
> > > message
> > > > is
> > > > > > >> consumed by a route :
> > > > > > >>
> > > > > > >> Trace: java.util.ConcurrentModificationException:
> KafkaConsumer
> > is
> > > > not
> > > > > > >> safe for multi-threaded access
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > > > > > >>
> > > > > > >>
> > > > > > >> My understanding is that the instance of KafkaConsumer is
> reused
> > > in
> > > > > > >> multiple routes and therefore it generates the error, but it
> > could
> > > > be
> > > > > > also
> > > > > > >> related to using SEDA endpoint as stated here (
> > > > > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we
> > > don't
> > > > > > >> explicitly do.
> > > > > > >>
> > > > > > >> We tried injecting a KafkaComponent local bean in the route :
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > > > > > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> > > > > > >>            .end()
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > > > > > >> "#{{myKafkaConfiguration}}")
> > > > > > >>            .end()
> > > > > > >>            .from("#{{myKafka}}")
> > > > > > >>
> > > > > > >> But it ends up with another error because you cannot consume a
> > > Bean
> > > > > > >> endpoint (
> > > > > > https://camel.apache.org/components/3.18.x/bean-component.html)
> > > > > > >>
> > > > > > >> How to use a different KafkaConsumer for every created route ?
> > Or,
> > > > if
> > > > > > the
> > > > > > >> issue is SEDA related, how to make this route a direct route?
> > > > > > >>
> > > > > > >> Thank you for your help
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Otavio R. Piske
> > > > > > > http://orpiske.net
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Otavio R. Piske
> > > > > http://orpiske.net
> > > > >
> > > >
> > >
> > >
> > > --
> > > Otavio R. Piske
> > > http://orpiske.net
> > >
> >
>
>
> --
> Otavio R. Piske
> http://orpiske.net
>

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Posted by Otavio Rodolfo Piske <an...@gmail.com>.
Hi,

Thanks. I have to be honest with you: I truly want to look more closely at
this one, but it's been a bit hard to try to make sense of the code you
provided so far. It's not something I can quickly modify one of our unit
tests and run.

In this case ...

Please, can you provide a full reproducer and send the code? Please put it
on Github, so I can clone and reproduce and debug? That would make
investigating and fixing this much easier and quicker for me.

Kind regards

On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <iv...@gmail.com>
wrote:

> Hi Octavio
> I've been  busy ...
>
> I upgraded to Came 3.18.3 :
> 22-11-2022 12:35:10.829 [restartedMain] INFO
>  o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel 3.18.3
> (camel-1) is starting
>
> Still I have the error :
> CaughtExceptionType: java.util.ConcurrentModificationException
>  CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> access  StackTrace: java.util.ConcurrentModificationException:
> KafkaConsumer is not safe for multi-threaded access
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
> at
>
> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
> at
>
> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
> at
>
> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
> at
>
> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
> at
>
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
> at
>
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
> at
>
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
> at
>
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
> at
>
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
> at
>
> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
> at
>
> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
> at
>
> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
>
> Ivan
>
> On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <an...@gmail.com>
> wrote:
>
> > Hi, thanks
> >
> > Please, can you try with Camel 3.18.3? Along with the rest of the
> > community, we introduced a lot of fixes on the Kafka component on 3.18.2
> > and 3.18.3. Maybe it will help you solve the problem (and, at the same
> > time, it's an LTS version, so we can fix it if there's a problem).
> >
> > Thanks in advance
> >
> > On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
> ivan.rododendro@gmail.com>
> > wrote:
> >
> > > Hi Otavio,
> > > Camel version is 3.17.0
> > >
> > > thank you
> > >
> > > On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
> > angusyoung@gmail.com>
> > > wrote:
> > >
> > > > Hi Ivan,
> > > >
> > > > For Kafka (consumer) objects, yes, they should be created for every
> > > route.
> > > > I want to try to take a look at it during this week or in the next.
> > > >
> > > > In the meantime, can you please tell me which version of Camel you
> are
> > > > using?
> > > >
> > > > Kind regards
> > > >
> > > > On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Oktavio thank you for your response.
> > > > >
> > > > > The commit is made synchronously by the kafkaOffsetProcessor (whose
> > > code
> > > > I
> > > > > forgot to attach, more details here
> > > > >
> > > >
> > >
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> > > > ).
> > > > > Basically it does what the documentation says for synchronous
> > commits.
> > > > >
> > > > > Multiple threads cannot access the same kafka client that’s it, but
> > > camel
> > > > > handles instances and threads, so not so easy to fix for me.
> > > > >
> > > > > I think about seda because I know that for seda endpoint object are
> > > > pooled
> > > > > (if my understanding is right).
> > > > >
> > > > > Are you sure that new objects are created for every route created
> > from
> > > a
> > > > > template ?
> > > > >
> > > > > Ivan Rododendro
> > > > >
> > > > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
> > angusyoung@gmail.com>
> > > a
> > > > > écrit :
> > > > > >
> > > > > > Hi,
> > > > > >
> > > > > > From the code you provided, it's not very clear to me when and
> > where
> > > > you
> > > > > > are calling the commit. Also it's not very clear to me: which
> > version
> > > > of
> > > > > > Camel you are using and which kind of commit factory you are
> using
> > > > (async
> > > > > > [1] or sync [2]?).
> > > > > >
> > > > > > That said ...The problem here is that - as explained in the
> > exception
> > > > > > message - the Kafka client cannot be accessed from a different
> > > thread.
> > > > > >
> > > > > > So, I am not entirely sure that the problem is related to seda or
> > > > > something
> > > > > > like that. Also, Camel will indeed, create a different consumer
> for
> > > > every
> > > > > > route.
> > > > > >
> > > > > > Please, can you provide a bit more details about the code you
> have?
> > > > > >
> > > > > > 1.
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > > > > 2.
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > > > > >
> > > > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > > > > ivan.rododendro@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >> Hello
> > > > > >> I'm really new to Camel concepts, our need is to create some
> > > identical
> > > > > >> routes, identical except for some parameters, from a Kafka topic
> > to
> > > a
> > > > > http
> > > > > >> endpoint, with some processing in-between.
> > > > > >>
> > > > > >> Besides this we want to explicitly commit the message
> consumption
> > > only
> > > > > when
> > > > > >> the http endpoint has been successfully called.
> > > > > >>
> > > > > >> In order to achieve this we set up a route template that carries
> > the
> > > > > Route
> > > > > >> parameterization and set it up to manually commit after having
> > > called
> > > > > the
> > > > > >> http endpoint :
> > > > > >> public void configure() throws Exception {
> > > > > >>        // @formatter:off
> > > > > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > > > > >>            .templateParameter(Constantes.JOB_NAME)
> > > > > >>            .templateParameter(Constantes.TOPIC)
> > > > > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > > > > >>
> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > > > > >>
> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > >>            .from(getKafkaEndpoint())
> > > > > >>            .messageHistory()
> > > > > >>            .filter(simple("${header.publisherId} ==
> > > > '{{publisherId}}'"))
> > > > > >>            .process(messageLoggerProcessor)
> > > > > >>            .process(modelMapperProcessor)
> > > > > >>            .process(jsonlToArrayProcessor)
> > > > > >>            .process(payloadProcessor)
> > > > > >>
> > > > > >>
> > > >
> > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > > > > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > > > > >>            .setHeader(Exchange.CONTENT_TYPE,
> > > > > >> constant("application/json;charset=UTF-8"))
> > > > > >>
> > > .setHeader(Constantes.ACCEPT,constant("application/json"))
> > > > > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > > > > >>
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > > > > >>            .process(apiConsumerProcessorLogger)
> > > > > >>            .to(this.url)
> > > > > >>            .process(kafkaOffsetProcessor);
> > > > > >>        // @formatter:on
> > > > > >>    }
> > > > > >>
> > > > > >>    private String getKafkaEndpoint() {
> > > > > >>        String endpoint =
> > > > > >>
> > > > >
> > >
> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> > > > +
> > > > > >> kafkaBrokers;
> > > > > >>
> > > > > >>        if (securityEnabled()) {
> > > > > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > > > > >> "&saslMechanism=PLAIN"
> > > > > >>                    +
> > > > > >>
> > > >
> > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > > > > >> required username=\""
> > > > > >>                    + username + "\" password=\"" + password +
> > "\";"
> > > +
> > > > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > > > > >>                    + "&sslTruststorePassword=" +
> > > > sslTruststorePassword;
> > > > > >>        }
> > > > > >>
> > > > > >>        return endpoint;
> > > > > >>    }
> > > > > >>
> > > > > >> The problem is that we systematically get this error when a
> > message
> > > is
> > > > > >> consumed by a route :
> > > > > >>
> > > > > >> Trace: java.util.ConcurrentModificationException: KafkaConsumer
> is
> > > not
> > > > > >> safe for multi-threaded access
> > > > > >>    at
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > > > > >>    at
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > > > > >>    at
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > > > > >>    at
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > > > > >>    at
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > > > > >>
> > > > > >>
> > > > > >> My understanding is that the instance of KafkaConsumer is reused
> > in
> > > > > >> multiple routes and therefore it generates the error, but it
> could
> > > be
> > > > > also
> > > > > >> related to using SEDA endpoint as stated here (
> > > > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we
> > don't
> > > > > >> explicitly do.
> > > > > >>
> > > > > >> We tried injecting a KafkaComponent local bean in the route :
> > > > > >>
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > > > > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> > > > > >>            .end()
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > > > > >> "#{{myKafkaConfiguration}}")
> > > > > >>            .end()
> > > > > >>            .from("#{{myKafka}}")
> > > > > >>
> > > > > >> But it ends up with another error because you cannot consume a
> > Bean
> > > > > >> endpoint (
> > > > > https://camel.apache.org/components/3.18.x/bean-component.html)
> > > > > >>
> > > > > >> How to use a different KafkaConsumer for every created route ?
> Or,
> > > if
> > > > > the
> > > > > >> issue is SEDA related, how to make this route a direct route?
> > > > > >>
> > > > > >> Thank you for your help
> > > > > >>
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Otavio R. Piske
> > > > > > http://orpiske.net
> > > > >
> > > >
> > > >
> > > > --
> > > > Otavio R. Piske
> > > > http://orpiske.net
> > > >
> > >
> >
> >
> > --
> > Otavio R. Piske
> > http://orpiske.net
> >
>


-- 
Otavio R. Piske
http://orpiske.net

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Posted by Ivan Rododendro <iv...@gmail.com>.
Hi Octavio
I've been  busy ...

I upgraded to Came 3.18.3 :
22-11-2022 12:35:10.829 [restartedMain] INFO
 o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel 3.18.3
(camel-1) is starting

Still I have the error :
CaughtExceptionType: java.util.ConcurrentModificationException
 CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
access  StackTrace: java.util.ConcurrentModificationException:
KafkaConsumer is not safe for multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
at
org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
at
org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
at
fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
at
org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
at
org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
at
org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
at
org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)

Ivan

On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <an...@gmail.com>
wrote:

> Hi, thanks
>
> Please, can you try with Camel 3.18.3? Along with the rest of the
> community, we introduced a lot of fixes on the Kafka component on 3.18.2
> and 3.18.3. Maybe it will help you solve the problem (and, at the same
> time, it's an LTS version, so we can fix it if there's a problem).
>
> Thanks in advance
>
> On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <iv...@gmail.com>
> wrote:
>
> > Hi Otavio,
> > Camel version is 3.17.0
> >
> > thank you
> >
> > On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
> angusyoung@gmail.com>
> > wrote:
> >
> > > Hi Ivan,
> > >
> > > For Kafka (consumer) objects, yes, they should be created for every
> > route.
> > > I want to try to take a look at it during this week or in the next.
> > >
> > > In the meantime, can you please tell me which version of Camel you are
> > > using?
> > >
> > > Kind regards
> > >
> > > On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com>
> wrote:
> > >
> > > > Hi Oktavio thank you for your response.
> > > >
> > > > The commit is made synchronously by the kafkaOffsetProcessor (whose
> > code
> > > I
> > > > forgot to attach, more details here
> > > >
> > >
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> > > ).
> > > > Basically it does what the documentation says for synchronous
> commits.
> > > >
> > > > Multiple threads cannot access the same kafka client that’s it, but
> > camel
> > > > handles instances and threads, so not so easy to fix for me.
> > > >
> > > > I think about seda because I know that for seda endpoint object are
> > > pooled
> > > > (if my understanding is right).
> > > >
> > > > Are you sure that new objects are created for every route created
> from
> > a
> > > > template ?
> > > >
> > > > Ivan Rododendro
> > > >
> > > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
> angusyoung@gmail.com>
> > a
> > > > écrit :
> > > > >
> > > > > Hi,
> > > > >
> > > > > From the code you provided, it's not very clear to me when and
> where
> > > you
> > > > > are calling the commit. Also it's not very clear to me: which
> version
> > > of
> > > > > Camel you are using and which kind of commit factory you are using
> > > (async
> > > > > [1] or sync [2]?).
> > > > >
> > > > > That said ...The problem here is that - as explained in the
> exception
> > > > > message - the Kafka client cannot be accessed from a different
> > thread.
> > > > >
> > > > > So, I am not entirely sure that the problem is related to seda or
> > > > something
> > > > > like that. Also, Camel will indeed, create a different consumer for
> > > every
> > > > > route.
> > > > >
> > > > > Please, can you provide a bit more details about the code you have?
> > > > >
> > > > > 1.
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > > > 2.
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > > > >
> > > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > > > ivan.rododendro@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> Hello
> > > > >> I'm really new to Camel concepts, our need is to create some
> > identical
> > > > >> routes, identical except for some parameters, from a Kafka topic
> to
> > a
> > > > http
> > > > >> endpoint, with some processing in-between.
> > > > >>
> > > > >> Besides this we want to explicitly commit the message consumption
> > only
> > > > when
> > > > >> the http endpoint has been successfully called.
> > > > >>
> > > > >> In order to achieve this we set up a route template that carries
> the
> > > > Route
> > > > >> parameterization and set it up to manually commit after having
> > called
> > > > the
> > > > >> http endpoint :
> > > > >> public void configure() throws Exception {
> > > > >>        // @formatter:off
> > > > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > > > >>            .templateParameter(Constantes.JOB_NAME)
> > > > >>            .templateParameter(Constantes.TOPIC)
> > > > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > > > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > > > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > >>            .from(getKafkaEndpoint())
> > > > >>            .messageHistory()
> > > > >>            .filter(simple("${header.publisherId} ==
> > > '{{publisherId}}'"))
> > > > >>            .process(messageLoggerProcessor)
> > > > >>            .process(modelMapperProcessor)
> > > > >>            .process(jsonlToArrayProcessor)
> > > > >>            .process(payloadProcessor)
> > > > >>
> > > > >>
> > >
> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > > > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > > > >>            .setHeader(Exchange.CONTENT_TYPE,
> > > > >> constant("application/json;charset=UTF-8"))
> > > > >>
> > .setHeader(Constantes.ACCEPT,constant("application/json"))
> > > > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > > > >>            .process(apiConsumerProcessorLogger)
> > > > >>            .to(this.url)
> > > > >>            .process(kafkaOffsetProcessor);
> > > > >>        // @formatter:on
> > > > >>    }
> > > > >>
> > > > >>    private String getKafkaEndpoint() {
> > > > >>        String endpoint =
> > > > >>
> > > >
> > "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> > > +
> > > > >> kafkaBrokers;
> > > > >>
> > > > >>        if (securityEnabled()) {
> > > > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > > > >> "&saslMechanism=PLAIN"
> > > > >>                    +
> > > > >>
> > >
> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > > > >> required username=\""
> > > > >>                    + username + "\" password=\"" + password +
> "\";"
> > +
> > > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > > > >>                    + "&sslTruststorePassword=" +
> > > sslTruststorePassword;
> > > > >>        }
> > > > >>
> > > > >>        return endpoint;
> > > > >>    }
> > > > >>
> > > > >> The problem is that we systematically get this error when a
> message
> > is
> > > > >> consumed by a route :
> > > > >>
> > > > >> Trace: java.util.ConcurrentModificationException: KafkaConsumer is
> > not
> > > > >> safe for multi-threaded access
> > > > >>    at
> > > > >>
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > > > >>    at
> > > > >>
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > > > >>    at
> > > > >>
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > > > >>    at
> > > > >>
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > > > >>    at
> > > > >>
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > > > >>
> > > > >>
> > > > >> My understanding is that the instance of KafkaConsumer is reused
> in
> > > > >> multiple routes and therefore it generates the error, but it could
> > be
> > > > also
> > > > >> related to using SEDA endpoint as stated here (
> > > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we
> don't
> > > > >> explicitly do.
> > > > >>
> > > > >> We tried injecting a KafkaComponent local bean in the route :
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > > > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> > > > >>            .end()
> > > > >>
> > > > >>
> > > >
> > >
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > > > >> "#{{myKafkaConfiguration}}")
> > > > >>            .end()
> > > > >>            .from("#{{myKafka}}")
> > > > >>
> > > > >> But it ends up with another error because you cannot consume a
> Bean
> > > > >> endpoint (
> > > > https://camel.apache.org/components/3.18.x/bean-component.html)
> > > > >>
> > > > >> How to use a different KafkaConsumer for every created route ? Or,
> > if
> > > > the
> > > > >> issue is SEDA related, how to make this route a direct route?
> > > > >>
> > > > >> Thank you for your help
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > Otavio R. Piske
> > > > > http://orpiske.net
> > > >
> > >
> > >
> > > --
> > > Otavio R. Piske
> > > http://orpiske.net
> > >
> >
>
>
> --
> Otavio R. Piske
> http://orpiske.net
>

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Posted by Otavio Rodolfo Piske <an...@gmail.com>.
Hi, thanks

Please, can you try with Camel 3.18.3? Along with the rest of the
community, we introduced a lot of fixes on the Kafka component on 3.18.2
and 3.18.3. Maybe it will help you solve the problem (and, at the same
time, it's an LTS version, so we can fix it if there's a problem).

Thanks in advance

On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <iv...@gmail.com>
wrote:

> Hi Otavio,
> Camel version is 3.17.0
>
> thank you
>
> On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <an...@gmail.com>
> wrote:
>
> > Hi Ivan,
> >
> > For Kafka (consumer) objects, yes, they should be created for every
> route.
> > I want to try to take a look at it during this week or in the next.
> >
> > In the meantime, can you please tell me which version of Camel you are
> > using?
> >
> > Kind regards
> >
> > On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com> wrote:
> >
> > > Hi Oktavio thank you for your response.
> > >
> > > The commit is made synchronously by the kafkaOffsetProcessor (whose
> code
> > I
> > > forgot to attach, more details here
> > >
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> > ).
> > > Basically it does what the documentation says for synchronous commits.
> > >
> > > Multiple threads cannot access the same kafka client that’s it, but
> camel
> > > handles instances and threads, so not so easy to fix for me.
> > >
> > > I think about seda because I know that for seda endpoint object are
> > pooled
> > > (if my understanding is right).
> > >
> > > Are you sure that new objects are created for every route created from
> a
> > > template ?
> > >
> > > Ivan Rododendro
> > >
> > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <an...@gmail.com>
> a
> > > écrit :
> > > >
> > > > Hi,
> > > >
> > > > From the code you provided, it's not very clear to me when and where
> > you
> > > > are calling the commit. Also it's not very clear to me: which version
> > of
> > > > Camel you are using and which kind of commit factory you are using
> > (async
> > > > [1] or sync [2]?).
> > > >
> > > > That said ...The problem here is that - as explained in the exception
> > > > message - the Kafka client cannot be accessed from a different
> thread.
> > > >
> > > > So, I am not entirely sure that the problem is related to seda or
> > > something
> > > > like that. Also, Camel will indeed, create a different consumer for
> > every
> > > > route.
> > > >
> > > > Please, can you provide a bit more details about the code you have?
> > > >
> > > > 1.
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > > 2.
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > > >
> > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > > ivan.rododendro@gmail.com>
> > > >> wrote:
> > > >>
> > > >> Hello
> > > >> I'm really new to Camel concepts, our need is to create some
> identical
> > > >> routes, identical except for some parameters, from a Kafka topic to
> a
> > > http
> > > >> endpoint, with some processing in-between.
> > > >>
> > > >> Besides this we want to explicitly commit the message consumption
> only
> > > when
> > > >> the http endpoint has been successfully called.
> > > >>
> > > >> In order to achieve this we set up a route template that carries the
> > > Route
> > > >> parameterization and set it up to manually commit after having
> called
> > > the
> > > >> http endpoint :
> > > >> public void configure() throws Exception {
> > > >>        // @formatter:off
> > > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > > >>            .templateParameter(Constantes.JOB_NAME)
> > > >>            .templateParameter(Constantes.TOPIC)
> > > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > >>            .from(getKafkaEndpoint())
> > > >>            .messageHistory()
> > > >>            .filter(simple("${header.publisherId} ==
> > '{{publisherId}}'"))
> > > >>            .process(messageLoggerProcessor)
> > > >>            .process(modelMapperProcessor)
> > > >>            .process(jsonlToArrayProcessor)
> > > >>            .process(payloadProcessor)
> > > >>
> > > >>
> > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > > >>            .setHeader(Exchange.CONTENT_TYPE,
> > > >> constant("application/json;charset=UTF-8"))
> > > >>
> .setHeader(Constantes.ACCEPT,constant("application/json"))
> > > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > > >>
> > > >>
> > > >>
> > >
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > > >>            .process(apiConsumerProcessorLogger)
> > > >>            .to(this.url)
> > > >>            .process(kafkaOffsetProcessor);
> > > >>        // @formatter:on
> > > >>    }
> > > >>
> > > >>    private String getKafkaEndpoint() {
> > > >>        String endpoint =
> > > >>
> > >
> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> > +
> > > >> kafkaBrokers;
> > > >>
> > > >>        if (securityEnabled()) {
> > > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > > >> "&saslMechanism=PLAIN"
> > > >>                    +
> > > >>
> > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > > >> required username=\""
> > > >>                    + username + "\" password=\"" + password + "\";"
> +
> > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > > >>                    + "&sslTruststorePassword=" +
> > sslTruststorePassword;
> > > >>        }
> > > >>
> > > >>        return endpoint;
> > > >>    }
> > > >>
> > > >> The problem is that we systematically get this error when a message
> is
> > > >> consumed by a route :
> > > >>
> > > >> Trace: java.util.ConcurrentModificationException: KafkaConsumer is
> not
> > > >> safe for multi-threaded access
> > > >>    at
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > > >>    at
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > > >>    at
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > > >>    at
> > > >>
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > > >>    at
> > > >>
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > > >>
> > > >>
> > > >> My understanding is that the instance of KafkaConsumer is reused in
> > > >> multiple routes and therefore it generates the error, but it could
> be
> > > also
> > > >> related to using SEDA endpoint as stated here (
> > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we don't
> > > >> explicitly do.
> > > >>
> > > >> We tried injecting a KafkaComponent local bean in the route :
> > > >>
> > > >>
> > > >>
> > >
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> > > >>            .end()
> > > >>
> > > >>
> > >
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > > >> "#{{myKafkaConfiguration}}")
> > > >>            .end()
> > > >>            .from("#{{myKafka}}")
> > > >>
> > > >> But it ends up with another error because you cannot consume a Bean
> > > >> endpoint (
> > > https://camel.apache.org/components/3.18.x/bean-component.html)
> > > >>
> > > >> How to use a different KafkaConsumer for every created route ? Or,
> if
> > > the
> > > >> issue is SEDA related, how to make this route a direct route?
> > > >>
> > > >> Thank you for your help
> > > >>
> > > >
> > > >
> > > > --
> > > > Otavio R. Piske
> > > > http://orpiske.net
> > >
> >
> >
> > --
> > Otavio R. Piske
> > http://orpiske.net
> >
>


-- 
Otavio R. Piske
http://orpiske.net

Re: Camel and Kafka manual commit : java.util.ConcurrentModificationException

Posted by Ivan Rododendro <iv...@gmail.com>.
Hi Otavio,
Camel version is 3.17.0

thank you

On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <an...@gmail.com>
wrote:

> Hi Ivan,
>
> For Kafka (consumer) objects, yes, they should be created for every route.
> I want to try to take a look at it during this week or in the next.
>
> In the meantime, can you please tell me which version of Camel you are
> using?
>
> Kind regards
>
> On Wed, Oct 26, 2022 at 6:50 PM Ivan <iv...@gmail.com> wrote:
>
> > Hi Oktavio thank you for your response.
> >
> > The commit is made synchronously by the kafkaOffsetProcessor (whose code
> I
> > forgot to attach, more details here
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> ).
> > Basically it does what the documentation says for synchronous commits.
> >
> > Multiple threads cannot access the same kafka client that’s it, but camel
> > handles instances and threads, so not so easy to fix for me.
> >
> > I think about seda because I know that for seda endpoint object are
> pooled
> > (if my understanding is right).
> >
> > Are you sure that new objects are created for every route created from a
> > template ?
> >
> > Ivan Rododendro
> >
> > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <an...@gmail.com> a
> > écrit :
> > >
> > > Hi,
> > >
> > > From the code you provided, it's not very clear to me when and where
> you
> > > are calling the commit. Also it's not very clear to me: which version
> of
> > > Camel you are using and which kind of commit factory you are using
> (async
> > > [1] or sync [2]?).
> > >
> > > That said ...The problem here is that - as explained in the exception
> > > message - the Kafka client cannot be accessed from a different thread.
> > >
> > > So, I am not entirely sure that the problem is related to seda or
> > something
> > > like that. Also, Camel will indeed, create a different consumer for
> every
> > > route.
> > >
> > > Please, can you provide a bit more details about the code you have?
> > >
> > > 1.
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > 2.
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > >
> > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > ivan.rododendro@gmail.com>
> > >> wrote:
> > >>
> > >> Hello
> > >> I'm really new to Camel concepts, our need is to create some identical
> > >> routes, identical except for some parameters, from a Kafka topic to a
> > http
> > >> endpoint, with some processing in-between.
> > >>
> > >> Besides this we want to explicitly commit the message consumption only
> > when
> > >> the http endpoint has been successfully called.
> > >>
> > >> In order to achieve this we set up a route template that carries the
> > Route
> > >> parameterization and set it up to manually commit after having called
> > the
> > >> http endpoint :
> > >> public void configure() throws Exception {
> > >>        // @formatter:off
> > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > >>            .templateParameter(Constantes.JOB_NAME)
> > >>            .templateParameter(Constantes.TOPIC)
> > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > >>            .from(getKafkaEndpoint())
> > >>            .messageHistory()
> > >>            .filter(simple("${header.publisherId} ==
> '{{publisherId}}'"))
> > >>            .process(messageLoggerProcessor)
> > >>            .process(modelMapperProcessor)
> > >>            .process(jsonlToArrayProcessor)
> > >>            .process(payloadProcessor)
> > >>
> > >>
> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > >>            .setHeader(Exchange.CONTENT_TYPE,
> > >> constant("application/json;charset=UTF-8"))
> > >>            .setHeader(Constantes.ACCEPT,constant("application/json"))
> > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > >>
> > >>
> > >>
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > >>            .process(apiConsumerProcessorLogger)
> > >>            .to(this.url)
> > >>            .process(kafkaOffsetProcessor);
> > >>        // @formatter:on
> > >>    }
> > >>
> > >>    private String getKafkaEndpoint() {
> > >>        String endpoint =
> > >>
> > "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> +
> > >> kafkaBrokers;
> > >>
> > >>        if (securityEnabled()) {
> > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > >> "&saslMechanism=PLAIN"
> > >>                    +
> > >>
> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > >> required username=\""
> > >>                    + username + "\" password=\"" + password + "\";" +
> > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > >>                    + "&sslTruststorePassword=" +
> sslTruststorePassword;
> > >>        }
> > >>
> > >>        return endpoint;
> > >>    }
> > >>
> > >> The problem is that we systematically get this error when a message is
> > >> consumed by a route :
> > >>
> > >> Trace: java.util.ConcurrentModificationException: KafkaConsumer is not
> > >> safe for multi-threaded access
> > >>    at
> > >>
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > >>    at
> > >>
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > >>    at
> > >>
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > >>    at
> > >>
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > >>    at
> > >>
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > >>
> > >>
> > >> My understanding is that the instance of KafkaConsumer is reused in
> > >> multiple routes and therefore it generates the error, but it could be
> > also
> > >> related to using SEDA endpoint as stated here (
> > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we don't
> > >> explicitly do.
> > >>
> > >> We tried injecting a KafkaComponent local bean in the route :
> > >>
> > >>
> > >>
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> > >>            .end()
> > >>
> > >>
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > >> "#{{myKafkaConfiguration}}")
> > >>            .end()
> > >>            .from("#{{myKafka}}")
> > >>
> > >> But it ends up with another error because you cannot consume a Bean
> > >> endpoint (
> > https://camel.apache.org/components/3.18.x/bean-component.html)
> > >>
> > >> How to use a different KafkaConsumer for every created route ? Or, if
> > the
> > >> issue is SEDA related, how to make this route a direct route?
> > >>
> > >> Thank you for your help
> > >>
> > >
> > >
> > > --
> > > Otavio R. Piske
> > > http://orpiske.net
> >
>
>
> --
> Otavio R. Piske
> http://orpiske.net
>