You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/07 14:22:07 UTC

[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-914348989


   @rhauch Overall that looks good to me. It's an elegant solution to the tricky problem you noted about the opacity of task-provided source offsets w/r/t ordering.
   
   I'm a little worried about offset commits taking longer and longer with the more sophisticated approach you proposed (where we would unconditionally iterate over every record in the batch, instead of only until the first unacknowledged record). It's true that there would be natural back pressure from the producer as its `buffer.memory` gets eaten up, but with the default of 32MB, it still seems possible for a large number of unacknowledged records to build up. If this does happen, then offset commits may end up exceeding the `offset.flush.timeout.ms` for the worker, which may cause issues with the current model where a single shared, worker-global thread is used for offset commits of all tasks.
   
   If this is a valid concern and we'd like to take it into account for now, I can think of a couple ways to handle it off the top of my head:
   1. Use the simpler approach that blocks offset commits across the board if a single record remains unacknowledged for a long period of time (which may realistically be a problem if a single partition out of many is unavailable for some reason).
   2. Enable concurrent offset commits by multiple tasks.
   3. Instead of a single dequeue per task, use a `ConcurrentMap<Map<String, ?>, Queue<SubmittedRecord>>` that stores a single dequeue per unique source partition. This would allow us to iterate over the bare minimum number of records for every single offset commit and not spend time, for example, on accumulated records for unavailable Kafka partitions. We'd still have to iterate over those records eventually if the Kafka partition came back online, but that iteration would only ever occur once, instead of once for every offset commit.
   
   I think option 3 may be warranted, although it's still possible that offset commits take a long time if 32MB worth of records end up getting queued. Option 2 may be worth implementing or at least considering as a follow-up item to handle this case.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org