You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Burkard Stephan <> on 2020/08/24 07:02:44 UTC

Access native KafkaConsumer in Camel RoutePolicy


**I also posted this question on StackOverflow** =>

I "monitor" the number of consecutive failures in my Camel processing pipeline with a Camel RoutePolicy.

When a threshold of failures is reached, I want to pause the processing for a configured amount of time because it probably means that the data from another system is not yet ready and therefore every message fails.

Since the source of my pipeline is a Kafka topic, I should not just stop the whole route because the broker would assume my consumer died and rebalance.

The best way to "pause" topic consumption seems to be to pause[1] the KafkaConsumer (the native, not the one of Camel). Like this, the consumer continues to poll the broker, but it does not fetch any messages. 

**How can I access the native KafkaConsumer from the RoutePolicy context to call the pause and resume methods?**

The spring-kafka listener containers offers these methods[2], but when I use a Spring consumer, I would need to connect it with my Camel routes and it would get hairy to commit messages.

