You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Daniel Compton <de...@danielcompton.net> on 2014/06/30 08:36:56 UTC

Stopping thread on consumer timeout

I'm doing some testing to reconcile the results of mirror maker replication between two Kafka clusters across an unreliable (Internet) link using Clojure. In this case, we run our production tests, wait for MM replication to finish, then drain the topics on both sides of the network and compare the messages. This differs from the standard Kafka use case (I think) by being a task which has a stopping point. 

I'm able to drain all of the messages from a particular topic on one cluster, then after my timeout period, I get a timeout exception. So far so good. After adding more data to a topic I tried to kick the job off again but just got a Timeout Exception the second time I tried to run the consumer.  Looking at the logs it showed the connection was closed by the Broker with INFO logging level, i.e. No errors on broker side. 

Looking at the running threads I can see that there are a half dozen threads active that look like Kafka Consumers and connections to ZooKeeper. I would have thought that they would shut down after the Timeout but they haven't. What is the correct way of handling Kafka consumer threads and shutting them down when you are finished with them? In a long running Kafka job, how would I restart a consumer process after a timeout? Is a try/catch the idiomatic way to handle this?

Thanks,

---
Daniel

Re: Stopping thread on consumer timeout

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Daniel,

If you do not expect the consumer to stop due to no more data coming with a
time out exception, then you would try/catch the exception. On the other
hand, throwing the timeout exception does not necessarily stop the
background fetcher threads, if you really want to shut down the consumer
you need to use the shutdown() API.

Guozhang


On Sun, Jun 29, 2014 at 11:36 PM, Daniel Compton <de...@danielcompton.net>
wrote:

> I'm doing some testing to reconcile the results of mirror maker
> replication between two Kafka clusters across an unreliable (Internet) link
> using Clojure. In this case, we run our production tests, wait for MM
> replication to finish, then drain the topics on both sides of the network
> and compare the messages. This differs from the standard Kafka use case (I
> think) by being a task which has a stopping point.
>
> I'm able to drain all of the messages from a particular topic on one
> cluster, then after my timeout period, I get a timeout exception. So far so
> good. After adding more data to a topic I tried to kick the job off again
> but just got a Timeout Exception the second time I tried to run the
> consumer.  Looking at the logs it showed the connection was closed by the
> Broker with INFO logging level, i.e. No errors on broker side.
>
> Looking at the running threads I can see that there are a half dozen
> threads active that look like Kafka Consumers and connections to ZooKeeper.
> I would have thought that they would shut down after the Timeout but they
> haven't. What is the correct way of handling Kafka consumer threads and
> shutting them down when you are finished with them? In a long running Kafka
> job, how would I restart a consumer process after a timeout? Is a try/catch
> the idiomatic way to handle this?
>
> Thanks,
>
> ---
> Daniel




-- 
-- Guozhang