You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (Jira)" <ji...@apache.org> on 2020/06/23 07:20:00 UTC

[jira] [Updated] (CAMEL-15228) camel-kafka - Missing end of polling signal in Processors

     [ https://issues.apache.org/jira/browse/CAMEL-15228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen updated CAMEL-15228:
--------------------------------
    Summary: camel-kafka - Missing end of polling signal in Processors  (was: Missing end of polling signal in Processors)

> camel-kafka - Missing end of polling signal in Processors
> ---------------------------------------------------------
>
>                 Key: CAMEL-15228
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15228
>             Project: Camel
>          Issue Type: New Feature
>          Components: camel-kafka
>    Affects Versions: 3.4.0
>            Reporter: Jörg
>            Priority: Major
>
> For batch requirements it is currently not possible to get a signal for the last result within a poll request in processors (over all partitions and topics).
> The problem with this is, that a Processor cannot know when the last record of a poll received and it is time for commits to all of the affected partitions, because you cannot be sure that there is always a content in every partition and therefore you have no info in Processer, if any further message arrives.
> Solution could be to add a property to the manual commit handler in the [KafkaConsumer.KafkaFetchRecords.doRun|https://github.com/apache/camel/blob/c86c860b7c5b5ef754970291b7ad7c4b32b443c6/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L248]
>  
> {code:java}
> exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
> //adding new exchange property
> exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_OF_CURRENT_POLL_REQUEST, last_record_of_last_partition);
> {code}
> LAST_RECORD_IN_CURRENT_POLL_REQUEST should be a boolean to signal end of the [loop over all partitions|https://github.com/apache/camel/blob/c86c860b7c5b5ef754970291b7ad7c4b32b443c6/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L313]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)