You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Burkard Stephan <St...@visana.ch> on 2020/08/24 07:02:44 UTC

Access native KafkaConsumer in Camel RoutePolicy

Hi 

**I also posted this question on StackOverflow** => https://stackoverflow.com/q/63519628/8035582


I "monitor" the number of consecutive failures in my Camel processing pipeline with a Camel RoutePolicy.

When a threshold of failures is reached, I want to pause the processing for a configured amount of time because it probably means that the data from another system is not yet ready and therefore every message fails.

Since the source of my pipeline is a Kafka topic, I should not just stop the whole route because the broker would assume my consumer died and rebalance.

The best way to "pause" topic consumption seems to be to pause[1] the KafkaConsumer (the native, not the one of Camel). Like this, the consumer continues to poll the broker, but it does not fetch any messages. 

**How can I access the native KafkaConsumer from the RoutePolicy context to call the pause and resume methods?**


The spring-kafka listener containers offers these methods[2], but when I use a Spring consumer, I would need to connect it with my Camel routes and it would get hairy to commit messages.

[1] https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause-java.util.Collection-
[2] https://docs.spring.io/spring-kafka/docs/2.2.14.RELEASE/reference/html/#pause-resume

Thanks 
Stephan