You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "Hart, James W." <jw...@seic.com> on 2019/03/07 16:43:51 UTC

I'm having trouble processing batches of records from kafka in camel. I can't get them in a batch, nor can I assemble them using aggregation.

1. I am trying to process batches of records in camel from kafka, but I am unable to get a batch, nor am I able to aggregate the records myself.  I was expecting to get 5 records in a batch because I set "maxPollRecords=5", and then I could process them in a batch or split and process them individually.  I think that I see that the route did fetch the batch from kafka as on the 5th record I see that the header kafka.LAST_RECORD_BEFORE_COMMIT is true and the others prior are false, so I think that confirms that I got 5 records.  Does this indicate the batch fetch from kafka worked?

2. I also tried to aggregate using an aggregation strategy and end the aggregation on the kafka.LAST_RECORD_BEFORE_COMMIT=true record , but my aggregation strategy never assembled the records.  My camel is rusty as after a 5 year gap I'm back doing camel coding, so it may just be that I'm rusty.

I need to enrich the data, and I am able to enrich at scale by doing batches and running a bulk query from my database, but I can only do this if I can batch them.

My route defined this way in java:
	from(kafka:myTopic?brokers=hosts&groupId=jimtest&autoOffsetReset=earliest&consumerStreams=1&consumersCount=1&autoCommitEnable=false&allowManualCommit=true&maxPartitionFetchBytes=1000000&maxPollIntervalMs=30000&maxPollRecords=5).routeId("FromKafka")
	.process(new SetHeadersForRouting(partitionSetup))
	.aggregate(header("topic.PARTITION"), new ArrayListAggregationStrategy()).completionPredicate(new TopicPartitionPredicate())
	.log("${body}");

Does camel support this batch consuming from kafka and allow me to process them in a batch?
Should I expect that camel would deliver as a batch, and I just need to tweak my configuration, or do I need to aggregate and have just not gotten my aggregation code correct?

I am running camel version 2.22.3 and am running in java 1.8 in a fairly new java version.

Any help would be much appreciated!