You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Konstantine Karantasis (Jira)" <ji...@apache.org> on 2020/06/11 03:19:00 UTC

[jira] [Commented] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment

    [ https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17132874#comment-17132874 ] 

Konstantine Karantasis commented on KAFKA-9841:
-----------------------------------------------

This fix is now merged. Seems it can make {{2.5.1}} 
Thanks for checking [~vvcephei] and thanks for the contribution [~LucentWong]

> Connector and Task duplicated when a worker join with old generation assignment
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9841
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9841
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.4.0, 2.3.1, 2.4.1
>            Reporter: Yu Wang
>            Assignee: Yu Wang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> When using IncrementalCooperativeAssignor.class to assign connectors and tasks.
> Suppose there is a worker 'W' got some connection issue with the coordinator.
> During the connection issue, the connectors/tasks on 'W' are assigned to the others worker
> When the connection issue disappear, 'W' will join the group with an old generation assignment. Then the group leader will get duplicated connectors/tasks in the metadata sent by the workers. But the duplicated connectors/tasks will not be revoked.
>  
> Generation 3:
> Worker1:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker3:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 ributed.DistributedHerder)
> Worker4:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[misc], taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker5:
> [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
>  
> Generation 4:
> Worker1:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker3:
> [2020-03-17 04:32:35,489] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Group coordinator xxxxxx:9092 (id: 2147483631 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2020-03-17 04:32:35,590] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Discovered group coordinator xxxxxx:9092 (id: 2147483631 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2020-03-17 04:32:36,910] INFO WorkerSourceTask\{id=misc-3} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-17 04:32:36,910] INFO WorkerSourceTask\{id=misc-3} flushing 86 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Attempt to heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
> [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> Worker4:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker5:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
>  
> Generation 5:
> Worker1:
> [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker3:
> [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker4:
> [2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker5:
> [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment\{error=0, leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)