You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tobias Pfeiffer <tg...@preferred.jp> on 2014/06/05 11:40:32 UTC

How to shut down Spark Streaming with Kafka properly?

Hi,

I am trying to use Spark Streaming with Kafka, which works like a
charm -- except for shutdown. When I run my program with "sbt
run-main", sbt will never exit, because there are two non-daemon
threads left that don't die.

I created a minimal example at
<https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala>.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future { ...
}` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
<https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1>.
There are a number of threads remaining that will prevent sbt from
exiting.

When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2>.

Does anyone have *any* idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.

Thanks
Tobias

Re: How to shut down Spark Streaming with Kafka properly?

Posted by Sean Owen <so...@cloudera.com>.
I closed that PR for other reasons. This change is still proposed by itself:

https://issues.apache.org/jira/browse/SPARK-2034
https://github.com/apache/spark/pull/980

On Fri, Jun 6, 2014 at 1:35 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
> Sean,
>
> your patch fixes the issue, thank you so much! (This is the second
> time within one week I run into network libraries not shutting down
> threads properly, I'm really glad your code fixes the issue.)
>
> I saw your pull request is closed, but not merged yet. Can I do
> anything to get your fix into Spark? Open an issue, send a pull
> request myself etc.?
>
> Thanks
> Tobias

Re: How to shut down Spark Streaming with Kafka properly?

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Sean,

your patch fixes the issue, thank you so much! (This is the second
time within one week I run into network libraries not shutting down
threads properly, I'm really glad your code fixes the issue.)

I saw your pull request is closed, but not merged yet. Can I do
anything to get your fix into Spark? Open an issue, send a pull
request myself etc.?

Thanks
Tobias

Re: How to shut down Spark Streaming with Kafka properly?

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Sean,

thanks for your link! I will try this ASAP!

On Thu, Jun 5, 2014 at 6:49 PM, Sean Owen <so...@cloudera.com> wrote:
> However I do seem to be able to shut down everything cleanly and
> terminate my (Java-based) program. I just call
> StreamingContext.stop(true, true). I don't know why it's different.

I think that's a peculiarity of sbt, cf.
http://www.scala-sbt.org/0.12.2/docs/Detailed-Topics/Running-Project-Code.html
If I call System.exit() that will surely terminate my program (and
sbt) as well, but in general I think it's better not to have these
threads lurking around ;-)

Thanks
Tobias

Re: How to shut down Spark Streaming with Kafka properly?

Posted by Sean Owen <so...@cloudera.com>.
Yes I noted the same two issues -- there is a Executor that is never
closed down, and the ConsumerConnector is never shut down.

I foolishly tacked on a change to this effect on a different PR
(https://github.com/apache/spark/pull/926/files#diff-bf41e92a42a1bdb3bc1662fd9fc50fe2L38)
Maybe I can just propose this as a stand-alone change (and/or you can
try it separately).

However I do seem to be able to shut down everything cleanly and
terminate my (Java-based) program. I just call
StreamingContext.stop(true, true). I don't know why it's different.

But I think cleaning up the non-daemon threads from that pool is a
good change that should fix your issue. One sec...

On Thu, Jun 5, 2014 at 10:40 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
> Hi,
>
> I am trying to use Spark Streaming with Kafka, which works like a
> charm -- except for shutdown. When I run my program with "sbt
> run-main", sbt will never exit, because there are two non-daemon
> threads left that don't die.
>
> I created a minimal example at
> <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala>.
> It starts a StreamingContext and does nothing more than connecting to
> a Kafka server and printing what it receives. Using the `future { ...
> }` construct, I shut down the StreamingContext after some seconds and
> then print the difference between the threads at start time and at end
> time. The output can be found at
> <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1>.
> There are a number of threads remaining that will prevent sbt from
> exiting.
>
> When I replace `KafkaUtils.createStream(...)` with a call that does
> exactly the same, except that it calls `consumerConnector.shutdown()`
> in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
> shown at <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2>.
>
> Does anyone have *any* idea what is going on here and why the program
> doesn't shut down properly? The behavior is the same with both kafka
> 0.8.0 and 0.8.1.1, by the way.
>
> Thanks
> Tobias