You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Luke Chen (Jira)" <ji...@apache.org> on 2022/09/06 08:51:00 UTC

[jira] [Comment Edited] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

    [ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600675#comment-17600675 ] 

Luke Chen edited comment on KAFKA-14196 at 9/6/22 8:50 AM:
-----------------------------------------------------------

[~pnee] , thanks for the analysis. Yes, we forgot about during the following poll, the offset might advance while we're waiting for the old async offset commit completion.

Actually, while checking the code, even if we don't do the change for KAFKA-14024,and KAFKA-13310, (that is, changing sync commit to async commit) the issue will still happen, just not that easily. The issue is, in the consumer#poll process, we do onJoinPrepare (i.e. commit the offset), and then fetch new records. I'm thinking we should have a way to terminate poll process to avoid it keep fetching new records and return.

 

Maybe in `KafkaConsumer#updateAssignmentMetadataIfNeeded`, we passed in a parameter to allow the `onJoinPrepare` method to change the flag to notify if we need to terminate the poll and not to fetch records. WDYT?

cc [~guozhang] [~dajac]  [~aiquestion]


was (Author: showuon):
[~pnee] , thanks for the analysis. Yes, we forgot about during the following poll, the offset might advance while we're waiting for the old async offset commit completion. 

Actually, while checking the code, even if we don't do the change for KAFKA-14024,and KAFKA-13310, (that is, changing sync commit to async commit) the issue will still happen, just not that easily. The issue is, in the consumer#poll process, we do onJoinPrepare (i.e. commit the offset), and then fetch new records. I'm thinking we should have a way to terminate poll process to avoid it keep fetching new records and return.

 

Maybe in `KafkaConsumer#updateAssignmentMetadataIfNeeded`, we passed in a parameter to allow the `onJoinPrepare` method to change the flag to notify if we need to terminate the poll and not to fetch records. WDYT?

cc [~guozhang] [~aiquestion]

> Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-14196
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14196
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.3.0, 3.2.1
>            Reporter: Philip Nee
>            Assignee: Philip Nee
>            Priority: Major
>              Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential consumer duplication issue, when autocommit is enabled.  I believe this is affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the start of a rebalance event and the async commit was lost for those failing tests.  In the example below, the rebalance event kicks in at around 1662054846995 (first record), and the async commit of the offset 3739 is completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the issue.
> The above tries seems to suggest that contract between poll() and asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the previously fetched data, and the consumer would (try to) commit these offsets in the current poll() loop.  However, it seems like as the poll continues to loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will alway return before async commit, due to the previous incomplete commit.  However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if we do that, we are back to KAFKA-14024, that the consumer will get rebalance timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xxxxxxxxxx/dockerYY {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)