You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Otavio Rodolfo Piske (Jira)" <ji...@apache.org> on 2021/09/22 09:58:00 UTC

[jira] [Resolved] (CAMEL-13768) Seek to specific offset and KafkaConsumer access

     [ https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Otavio Rodolfo Piske resolved CAMEL-13768.
------------------------------------------
    Resolution: Fixed

The changes were merged. Closing.

> Seek to specific offset and KafkaConsumer access 
> -------------------------------------------------
>
>                 Key: CAMEL-13768
>                 URL: https://issues.apache.org/jira/browse/CAMEL-13768
>             Project: Camel
>          Issue Type: New Feature
>          Components: camel-kafka
>    Affects Versions: 2.24.1
>            Reporter: michael elbaz
>            Assignee: Otavio Rodolfo Piske
>            Priority: Major
>             Fix For: 3.x
>
>
> 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) there is no way to do that using camel-kafka component. The main idea is to replay older kafka messages without starting from the beginning.
>    for example:  https://blog.sysco.no/integration/kafka-rewind-consumers-offset/
> {code:java}
> boolean flag = true;
> while (true) {
>     ConsumerRecords<String, String> records = consumer.poll(100);
>     if(flag) {
>         Map<TopicPartition, Long> query = new HashMap<>();
>         query.put(
>                 new TopicPartition("simple-topic-1", 0),
>                 Instant.now().minus(10, MINUTES).toEpochMilli());
>         // Get offset from timestamp
>         Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
>         // Rewind offset to previous position using seekTo
>         result.entrySet()
>                 .stream()
>                 .forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
>         flag = false;
>     }
>     for (ConsumerRecord<String, String> record : records)
>         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
> }
> {code}
> 2. Provide a way to access kafkaConsumer
>    Add camel header with reference to kafkaConsumer to be able to perform some Kafka api call.We can use the same way that we do with KafkaManualCommit
> {code:java}
> public void process(Exchange exchange) {
>     KafkaManualCommit manual =
>         exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
>     manual.commitSync();
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)