You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sébastien Launay (JIRA)" <ji...@apache.org> on 2016/04/04 22:23:25 UTC

[jira] [Updated] (KAFKA-3501) Console consumer process hangs on SIGINT

     [ https://issues.apache.org/jira/browse/KAFKA-3501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sébastien Launay updated KAFKA-3501:
------------------------------------
    Attachment: jstack.txt

> Console consumer process hangs on SIGINT
> ----------------------------------------
>
>                 Key: KAFKA-3501
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3501
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, core
>    Affects Versions: 0.9.0.0
>         Environment: Ubuntu 12.04.5 LTS
> OpenJDK Runtime Environment (IcedTea 2.6.3) (7u91-2.6.3-0ubuntu0.12.04.1)
> OpenJDK 64-Bit Server VM (build 24.91-b01, mixed mode)
>            Reporter: Sébastien Launay
>            Assignee: Neha Narkhede
>         Attachments: jstack.txt
>
>
> Sometimes when running the `kafka-console-consumer` script inside a pipe and trying to stop it with a `SIGINT` (`ctrl+c`), the process will not stop.
> {noformat}
> ubuntu@xxx:~$ kafka-console-consumer --zookeeper localhost --topic topic --from-beginning | grep "pattern"
> record1
> ...
> recordN
> ^CUnable to write to standard out, closing consumer.
>  ^C
> # process is still running
> {noformat}
> When looking at the various threads running on the JVM, I noticed that one user thread is waiting on a latch preventing the JVM from shutting down:
> {noformat}
> ...
> "Thread-6" prio=10 tid=0x00007f258c016000 nid=0x289f waiting on condition [0x00007f259aee3000]
>    java.lang.Thread.State: WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00000000d6640c80> (a java.util.concurrent.CountDownLatch$Sync)
> 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> 	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> 	at kafka.tools.ConsoleConsumer$$anon$1.run(ConsoleConsumer.scala:101)
> ...
> "main" prio=10 tid=0x00007f25c400e000 nid=0x2878 waiting for monitor entry [0x00007f25cbefc000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> 	at java.lang.Shutdown.exit(Shutdown.java:212)
> 	- waiting to lock <0x00000000d6974308> (a java.lang.Class for java.lang.Shutdown)
> 	at java.lang.Runtime.exit(Runtime.java:109)
> 	at java.lang.System.exit(System.java:962)
> 	at kafka.tools.ConsoleConsumer$.checkErr(ConsoleConsumer.scala:149)
> 	at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:136)
> 	at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
> 	at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
> 	at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {noformat}
> I believe the standard output linked to the defunct grep process get closed and triggers a `System.exit(1)` that prevents the latch from getting count down and therefore the main thread to hang on forever:
> {code:scala}
>   def checkErr(formatter: MessageFormatter) {
>     if (System.out.checkError()) {
>        // This means no one is listening to our output stream any more, time to shutdown
>        System.err.println("Unable to write to standard out, closing consumer.")
>        formatter.close()
>        System.exit(1)
>      }
>    }
> {code:java}
> This only happens when `System.out` is checked for errors after consuming a message and before the consumer get closed, definitely a race condition that is most likely to happen when messages are consumed at a high throughput.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)