You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2019/08/07 07:38:00 UTC
[jira] [Updated] (CAMEL-13768) Seek to specific offset and
KafkaConsumer access
[ https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen updated CAMEL-13768:
--------------------------------
Priority: Major (was: Minor)
> Seek to specific offset and KafkaConsumer access
> -------------------------------------------------
>
> Key: CAMEL-13768
> URL: https://issues.apache.org/jira/browse/CAMEL-13768
> Project: Camel
> Issue Type: New Feature
> Components: camel-kafka
> Affects Versions: 2.24.1
> Reporter: michael elbaz
> Priority: Major
> Fix For: 3.x
>
>
> 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) there is no way to do that using camel-kafka component. The main idea is to replay older kafka messages without starting from the beginning.
> for example: https://blog.sysco.no/integration/kafka-rewind-consumers-offset/
> {code:java}
> boolean flag = true;
> while (true) {
> ConsumerRecords<String, String> records = consumer.poll(100);
> if(flag) {
> Map<TopicPartition, Long> query = new HashMap<>();
> query.put(
> new TopicPartition("simple-topic-1", 0),
> Instant.now().minus(10, MINUTES).toEpochMilli());
> // Get offset from timestamp
> Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
> // Rewind offset to previous position using seekTo
> result.entrySet()
> .stream()
> .forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
> flag = false;
> }
> for (ConsumerRecord<String, String> record : records)
> System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
> }
> {code}
> 2. Provide a way to access kafkaConsumer
> Add camel header with reference to kafkaConsumer to be able to perform some Kafka api call.We can use the same way that we do with KafkaManualCommit
> {code:java}
> public void process(Exchange exchange) {
> KafkaManualCommit manual =
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
> manual.commitSync();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)