You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by arunas junevicius <ar...@gmail.com> on 2017/12/14 09:30:18 UTC

pollEnrich Kafka messages

Hello,

i've had hard time trying to poll Kafka messages using Camel 2.20.1.

.setHeader(KafkaConstants.OFFSET, constant("smallest"))
.pollEnrich("kafka:myTopic?brokers=MyHost:1111&groupId=reader", 5000L)
.log("${body}");

All I can find in the logs is

INFO 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer                  :
Starting Kafka consumer on topic: myTopic with breakOnFirstError: false
DEBUG 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer
: Starting consumer:
Consumer[kafka://myTopic?brokers=MyHost:1111&groupId=reader]
DEBUG 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer
: Kafka consumer groupId is reader


After five seconds timeout the body is null. If I would use same URI in
from("kafka:..") it would work fine.
From logs it would seem like KafkaFetchRecords is not invoked.
I wasn't able to find kafka-camel tests testing pollEnrich or any newer
examples on-line, can anyone confirm that it's working? what might be
possible cause of my problem?

Besides that can you guys explain how camel-kafka component is designed?
pollEnrich can return single message only, but to my understanding
KafkaConsumer polls messages for some time in infinite loop and gets
multiple messages. Does camel cache those messages ? Or does it disconnect
from Kafka so that it wouldn't consume too much memory after receiving too
many messages?

Re: pollEnrich Kafka messages

Posted by arunas junevicius <ar...@gmail.com>.
Yes, it seems like the only way for me.
While I'm at it I would like to understand how pollEnrich works with Kafka.
I was looking at KafkaConsumer where messages are being polled and I see
how polling multiple messages from multiple partitions work when
KafkaConsumer is invoked from from("kafka:.."), but pollEnrich should
return only single message. And polling is done in a separate thread, so
does it mean there's a cache or internal queue, how big can it get? I'm
kinda new to Camel, maybe I'm not looking at the right place?;]


ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);
> for (TopicPartition partition : allRecords.partitions()) {
> ...
> while (!breakOnErrorHit && recordIterator.hasNext()) {
> record = recordIterator.next();
> ...
> Exchange exchange = endpoint.createKafkaExchange(record);
> ...
> processor.process(exchange);
> ...


On Thu, Dec 14, 2017 at 5:50 PM, Claus Ibsen <cl...@gmail.com> wrote:

> poll enrich is for only polling 1 message on-demand during a route.
>
> If you want to continuesly receive all the messages from kafka, then
> use a route that starts from kafka, that is the most normal thing to
> do
>
> On Thu, Dec 14, 2017 at 10:30 AM, arunas junevicius
> <ar...@gmail.com> wrote:
> > Hello,
> >
> > i've had hard time trying to poll Kafka messages using Camel 2.20.1.
> >
> > .setHeader(KafkaConstants.OFFSET, constant("smallest"))
> > .pollEnrich("kafka:myTopic?brokers=MyHost:1111&groupId=reader", 5000L)
> > .log("${body}");
> >
> > All I can find in the logs is
> >
> > INFO 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer
>   :
> > Starting Kafka consumer on topic: myTopic with breakOnFirstError: false
> > DEBUG 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer
> > : Starting consumer:
> > Consumer[kafka://myTopic?brokers=MyHost:1111&groupId=reader]
> > DEBUG 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer
> > : Kafka consumer groupId is reader
> >
> >
> > After five seconds timeout the body is null. If I would use same URI in
> > from("kafka:..") it would work fine.
> > From logs it would seem like KafkaFetchRecords is not invoked.
> > I wasn't able to find kafka-camel tests testing pollEnrich or any newer
> > examples on-line, can anyone confirm that it's working? what might be
> > possible cause of my problem?
> >
> > Besides that can you guys explain how camel-kafka component is designed?
> > pollEnrich can return single message only, but to my understanding
> > KafkaConsumer polls messages for some time in infinite loop and gets
> > multiple messages. Does camel cache those messages ? Or does it
> disconnect
> > from Kafka so that it wouldn't consume too much memory after receiving
> too
> > many messages?
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>

Re: pollEnrich Kafka messages

Posted by Claus Ibsen <cl...@gmail.com>.
poll enrich is for only polling 1 message on-demand during a route.

If you want to continuesly receive all the messages from kafka, then
use a route that starts from kafka, that is the most normal thing to
do

On Thu, Dec 14, 2017 at 10:30 AM, arunas junevicius
<ar...@gmail.com> wrote:
> Hello,
>
> i've had hard time trying to poll Kafka messages using Camel 2.20.1.
>
> .setHeader(KafkaConstants.OFFSET, constant("smallest"))
> .pollEnrich("kafka:myTopic?brokers=MyHost:1111&groupId=reader", 5000L)
> .log("${body}");
>
> All I can find in the logs is
>
> INFO 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer                  :
> Starting Kafka consumer on topic: myTopic with breakOnFirstError: false
> DEBUG 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer
> : Starting consumer:
> Consumer[kafka://myTopic?brokers=MyHost:1111&groupId=reader]
> DEBUG 10688 --- [ qtp43368234-22] o.a.c.c.k.KafkaConsumer
> : Kafka consumer groupId is reader
>
>
> After five seconds timeout the body is null. If I would use same URI in
> from("kafka:..") it would work fine.
> From logs it would seem like KafkaFetchRecords is not invoked.
> I wasn't able to find kafka-camel tests testing pollEnrich or any newer
> examples on-line, can anyone confirm that it's working? what might be
> possible cause of my problem?
>
> Besides that can you guys explain how camel-kafka component is designed?
> pollEnrich can return single message only, but to my understanding
> KafkaConsumer polls messages for some time in infinite loop and gets
> multiple messages. Does camel cache those messages ? Or does it disconnect
> from Kafka so that it wouldn't consume too much memory after receiving too
> many messages?



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2