You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Balázs Németh (Jira)" <ji...@apache.org> on 2023/03/29 04:57:00 UTC

[jira] [Commented] (KAFKA-14865) Consumer::poll returns early on with an empty result for highly compacted KTables

    [ https://issues.apache.org/jira/browse/KAFKA-14865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706215#comment-17706215 ] 

Balázs Németh commented on KAFKA-14865:
---------------------------------------

If I would be fixing, I would modify this:

[https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java#L46-L48]

 
{code:java}
Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap = records.isEmpty() ? new HashMap<>() : mkMap(mkEntry(partition, records));{code}
 

to this:

 
{code:java}
Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap = mkMap(mkEntry(partition, records)); {code}
This seems to be the least intrusive change, but I'm not sure if it causes issues with KAFKA-12980 again or not.

 

> Consumer::poll returns early on with an empty result for highly compacted KTables
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-14865
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14865
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.2.0
>            Reporter: Balázs Németh
>            Priority: Major
>
> This behaviour/regression was introduced by https://issues.apache.org/jira/browse/KAFKA-12980 / [https://github.com/apache/kafka/pull/11046] 
> The issue happens when processing a topic that has huge offset ranges that has been "compacted away". It's the scenario that triggers [https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1266-L1269C21] 
> Before that change this pseudocode worked:
>  
> {code:java}
> for (;;) {
>   ConsumerRecords result = consumer.poll(..);
>   if (result.isEmpty()) {
>     scheduleLater(..);
>     break;
>   }
>   process(result);
> }{code}
> It worked because the poll waited until the first fetch that had a record, or the timeout happened. 
>  
> Now it only waits for the very first fetch. In many cases the returned `Fetch` object return false for `isEmpty()` yet it contains no records. (`positionAdvanced` is true, `records` is empty) [https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java#L102-L104]
> So now it calls the `scheduleLater` after every fetch in those ranges. That might throttle the ingestion a lot.
> Before the change:
> - we have record(s) -> return immediately with a nonempty result
> - we encountered a gap -> iterates over it until a record is found (and return with a nonempty result) or the timeout expires (which with sufficiently big timeout effectively never happens)
> - we encountered the end -> tries to read for the timeout, and returns with an empty result
> After the change:
> - we have record(s) -> return immediately with a nonempty result
> - we encountered a gap -> return the result of the very first fetch, if the gap is bigger than that it returns an empty result otherwise nonempty result
> - we encountered the end -> tries to read for the timeout, and returns with an empty result
> As you can see before the change an empty result most likely meant that we have reached the end of the stream. After the change it has an additional meaning, and there is no way to make a distinction to figure out which scenario we encountered.
> `org.apache.kafka.clients.consumer.internals.Fetch` has that info, but when it gets converted to `org.apache.kafka.clients.consumer.ConsumerRecords` that context is lost.
> So a `Fetch` with isEmpty() == false produces a `ConsumerRecords` with isEmpty() == true.
>  
> ps.: For example another project where it causes issues: [https://github.com/apache/beam/blob/3efd3c38b9917ae1f51f466a237a79d9fe57b2a6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L383C36-L388] (apache/beam allows the usage of any version of `kafka-clients` you prefer, so if you pick the 3.2.0 or any newer this issue might happen).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)