You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2019/05/08 21:53:00 UTC

[jira] [Commented] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition

    [ https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835931#comment-16835931 ] 

Matthias J. Sax commented on KAFKA-8339:
----------------------------------------

{quote}The race condition occurs when the stream thread commits its offsets for topic T1 after it consumes some or all of the necessary records from topic T1 for an aggregation but before it gets the failure response back from the async produce kicked off by KSTREAM-SINK-NODE1.
{quote}
This does not sound correct. Before offsets are committed, `producer.flush()` is called and all pending in-flight request should be written. Committing offset should only happen if no error occurred during `flush()`. Can you confirm your observation? If your description is correct, we need to figure out why we still commit even if an error occurred.

Also, offsets are not committed async but via `consumer.commitSync()`.
{quote}LogAndFailExceptionHandler
{quote}
This handler is for the input path, and should only be called if there is a deserialization exception. Thus, I don't see how it is related to the other things reported here? Can you elaborate?
{quote}so when the stream thread tries to commit the next time it fails and the stream thread shuts itself down.
{quote}
I don't see the causality? Why would committing fail? Also, what error message do you see on a failed commit?

> At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8339
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8339
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.1
>            Reporter: tdp
>            Priority: Major
>
> We have hit a race condition several times now between the StreamThread committing its offsets for a task before the task has fully processed the record through the topology.
>  
> Consider part of a topology that looks like this:
>  
> TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC T2
>  
> Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these records from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records using a local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not all necessary records from topic T1 have been consumed yet or an object representing an aggregation of records if all necessary records from topic T1 have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into another object type. KSTREAM-SINK-NODE1 then attempts to produce this other object to topic T2.
>  
> The race condition occurs when the stream thread commits its offsets for topic T1 after it consumes some or all of the necessary records from topic T1 for an aggregation but before it gets the failure response back from the async produce kicked off by KSTREAM-SINK-NODE1.
>  
> We are running with a LogAndFailExceptionHandler, so when the stream thread tries to commit the next time it fails and the stream thread shuts itself down. The stream task is then reassigned to another stream thread, which reads the offsets previously committed by the original stream thread. That means the new stream thread's KSTREAM-SOURCE-NODE1 will never be able to consume the messages required for the aggregation and the KSTREAM-SINK-NODE1 will never end up producing the required records to topic T2. This is why it seems the at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 never successfully processed records and the stream application continued on past it.
> Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which increases the likelihood of occurrence of the issue when all retries fail since it widens the window at which the async offset commit can occur before the produce record request is marked as failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)