You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2016/12/14 12:03:58 UTC

[jira] [Assigned] (CAMEL-10594) Kafka consumer stays alive when camel context is shut down

     [ https://issues.apache.org/jira/browse/CAMEL-10594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen reassigned CAMEL-10594:
-----------------------------------

    Assignee: Claus Ibsen

> Kafka consumer stays alive when camel context is shut down
> ----------------------------------------------------------
>
>                 Key: CAMEL-10594
>                 URL: https://issues.apache.org/jira/browse/CAMEL-10594
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>            Reporter: James Netherton
>            Assignee: Claus Ibsen
>             Fix For: 2.18.2, 2.19.0
>
>
> I happened to be running some camel-kafka unit tests with the log level set to DEBUG and noticed that the KafkaConsumer is not shut down correctly.
> When the Camel Kafka consumer is stopped, it invokes shutdownNow() on the ExecutorService. But this does not guarantee any running threads will be terminated.
> This is a bit of an issue when Camel runs in a container like Karaf or WildFly because the [KafkaFetchRecords|https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L94] thread just keeps on running for the lifetime of the JVM. 
> It's simple to reproduce in a unit test:
> * Enable DEBUG log level
> * Start a Camel context with a Kafka consumer endpoint
> * Stop the camel context
> * Thread.sleep for some time (10 seconds or whatever). Then notice exception in the log output:
> {code}
> 07:09:44,247 DEBUG [org.apache.kafka.clients.NetworkClient] (Camel (camel-36) thread #134 - KafkaConsumer[test]) Error connecting to node 1 at localhost:9092:: java.nio.channels.ClosedByInterruptException
> 	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
> 	at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
> 	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
> 	at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48)
> 	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:645)
> 	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> 	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:130)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)