You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2023/01/23 19:17:00 UTC

[jira] [Resolved] (KAFKA-14311) Connect Worker clean shutdown does not cleanly stop connectors/tasks

     [ https://issues.apache.org/jira/browse/KAFKA-14311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Egerton resolved KAFKA-14311.
-----------------------------------
    Resolution: Fixed

> Connect Worker clean shutdown does not cleanly stop connectors/tasks
> --------------------------------------------------------------------
>
>                 Key: KAFKA-14311
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14311
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.3.1
>            Reporter: Greg Harris
>            Assignee: Sagar Rao
>            Priority: Minor
>             Fix For: 3.5.0
>
>
> When the DistributedHerder::stop() method called, it triggers asynchronous shutdown of the background herder thread, and continues with synchronous shutdown of some other resources, including the stopAndStartExecutor.
> This executor is responsible for cleanly stopping connectors and tasks, which it  the DistributedHerder::halt() method. There is a race condition between the halt() method asynchronously submitting these connector/task stop jobs and the stop() method terminating the executor. If the executor is terminated first, this exception appears:
> {noformat}
> [2022-10-17 16:29:23,396] ERROR [Worker clientId=connect-2, groupId=connect-integration-test-connect-cluster-1] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:366)
> java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@62878e25[Not completed, task = org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$2285/0x00000008015046a8@58deade3] rejected from java.util.concurrent.ThreadPoolExecutor@10351ac3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
>     at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
>     at java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247)
>     at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startAndStop(DistributedHerder.java:1667)
>     at org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:765)
>     at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361)
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>     at java.base/java.lang.Thread.run(Thread.java:833){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)