You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "Hart, James W." <jw...@seic.com> on 2019/03/07 16:43:51 UTC
I'm having trouble processing batches of records from kafka in
camel. I can't get them in a batch, nor can I assemble them using
aggregation.
1. I am trying to process batches of records in camel from kafka, but I am unable to get a batch, nor am I able to aggregate the records myself. I was expecting to get 5 records in a batch because I set "maxPollRecords=5", and then I could process them in a batch or split and process them individually. I think that I see that the route did fetch the batch from kafka as on the 5th record I see that the header kafka.LAST_RECORD_BEFORE_COMMIT is true and the others prior are false, so I think that confirms that I got 5 records. Does this indicate the batch fetch from kafka worked?
2. I also tried to aggregate using an aggregation strategy and end the aggregation on the kafka.LAST_RECORD_BEFORE_COMMIT=true record , but my aggregation strategy never assembled the records. My camel is rusty as after a 5 year gap I'm back doing camel coding, so it may just be that I'm rusty.
I need to enrich the data, and I am able to enrich at scale by doing batches and running a bulk query from my database, but I can only do this if I can batch them.
My route defined this way in java:
from(kafka:myTopic?brokers=hosts&groupId=jimtest&autoOffsetReset=earliest&consumerStreams=1&consumersCount=1&autoCommitEnable=false&allowManualCommit=true&maxPartitionFetchBytes=1000000&maxPollIntervalMs=30000&maxPollRecords=5).routeId("FromKafka")
.process(new SetHeadersForRouting(partitionSetup))
.aggregate(header("topic.PARTITION"), new ArrayListAggregationStrategy()).completionPredicate(new TopicPartitionPredicate())
.log("${body}");
Does camel support this batch consuming from kafka and allow me to process them in a batch?
Should I expect that camel would deliver as a batch, and I just need to tweak my configuration, or do I need to aggregate and have just not gotten my aggregation code correct?
I am running camel version 2.22.3 and am running in java 1.8 in a fairly new java version.
Any help would be much appreciated!