You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2017/10/19 05:21:01 UTC

[jira] [Resolved] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped

     [ https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang resolved KAFKA-6085.
----------------------------------
       Resolution: Not A Problem
    Fix Version/s:     (was: 1.0.0)

This is actually not a bug, but only introduced in https://github.com/apache/kafka/pull/4085 trying to improve restoration latency. Resolving this as Not A Problem.

> Streams rebalancing may cause a first batch of fetched records to be dropped
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6085
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6085
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.1
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Blocker
>
> This is a regression introduced in KAFKA-5152:
> Assuming you have one task without any state stores (and hence no restoration needed for that task), and a rebalance happened in a {{records = pollRequests(pollTimeMs);}} call:
> 1. We name this `pollRequests` call A. And within call A the rebalance will happen, which put the thread state from RUNNING to PARTITION_REVOKED, and then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets assigned again, this task will be in the initialized set of tasks but NOT in the running tasks yet.
> 2. Within the same call A, a fetch request may be sent and a response with a batch of records could be returned, and it will be returned from `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and the task is not "running" yet.
> 3. Now the bug comes in this line:
> {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
> Since the task is not ing the active running set yet, this returned set of records would be skipped. Effectively these records are dropped on the floor and would never be consumed again.
> 4. In the next run loop, the same `pollRequest()` will be called again. Let's call it B. After B is called we will set the thread state to RUNNING and put the task to the running task set. But at this point the previous batch of records will not be returned any more.
> So the bug lies in the fact that within a single run loop of the stream thread. We may complete a rebalance with tasks assigned but not yet initialized, AND we can fetch a bunch of records for that not-initialized task and drop on the floor.
> With further investigation I can confirm that the new flaky test https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this bug by failing the reset integration test more frequently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)