You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sagar Rao (Jira)" <ji...@apache.org> on 2023/05/18 10:27:00 UTC

[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct

    [ https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723885#comment-17723885 ] 

Sagar Rao commented on KAFKA-15005:
-----------------------------------

hey [~LucentWong] thanks for reporting this. This looks very similar to https://issues.apache.org/jira/browse/KAFKA-12525 and has been present for a while. There's also a PR submitted for that bug. 

> Status of KafkaConnect task not correct
> ---------------------------------------
>
>                 Key: KAFKA-15005
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15005
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.5.1, 3.0.0, 3.3.2
>            Reporter: Yu Wang
>            Priority: Major
>
> Our MM2 is running version 2.5.1.
> After a rebalance of our MM2 source tasks, we found there were several tasks always in *UNASSIGNED* status, even the real tasks already started. 
> So we dump the payload of the status topic of Kafka Connect, and found the last two status change is status *RUNNING* followed by status {*}UNASSIGNED{*}.
> {code:java}
> LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 headerKeys: [] key: task-7 payload: {"state":"RUNNING","trace":null,"worker_id":"xxxxx","generation":437643}
> LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 headerKeys: [] key: task-7 payload: {"state":"UNASSIGNED","trace":null,"worker_id":"xxxxx","generation":437643}
>  {code}
> But usually, the RUNNING status should be appended after the UNASSIGNED, because the worker coordinator will revoked the tasks before start new tasks.
> Then we checked the log of our MM2 worker. And found that, during that time, there was a task that revoked on worker-2 and started on worker-1.
>  
> Worker-1
> {code:java}
> [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, groupId=xxxx__group] Starting task task-7 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2023-05-15 09:24:45,951] INFO Creating task task-7 (org.apache.kafka.connect.runtime.Worker) {code}
> Worker-2
> {code:java}
> [2023-05-15 09:24:40,922] INFO Stopping task task-7 (org.apache.kafka.connect.runtime.Worker) {code}
>  
> So I think the incorrect status was caused by the revoked task finished later than the new started task, which made the UNASSIGNED status append to that status topic after the RUNNING status. 
>  
> After reading the code of DistributeHerder, I found that the task revoking is running in a thread pool, the revoke operation just return after submit all the callables. So I think even in the same worker, there is not a guarantee that the revoke operation will always finish before the new tasks start.
> {code:java}
> for (final ConnectorTaskId taskId : tasks) {
>     callables.add(getTaskStoppingCallable(taskId));
> }
> // The actual timeout for graceful task/connector stop is applied in worker's
> // stopAndAwaitTask/stopAndAwaitConnector methods.
> startAndStop(callables);
> log.info("Finished stopping tasks in preparation for rebalance"); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)