You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2014/08/06 18:41:12 UTC

[jira] [Commented] (KAFKA-1574) unit tests can hand on socketserver shutdown

    [ https://issues.apache.org/jira/browse/KAFKA-1574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14087882#comment-14087882 ] 

Jun Rao commented on KAFKA-1574:
--------------------------------

This issue seems to be a race condition in AbstractServerThread. In Processor startup, we first call startupComplete(). However, if Processor.shutdown() is called before startupComplete() is called, the latter will set alive back to true and the thread will never be able to shutdown.

For the fix, since AbstractServerThread is not expected to be restarted, we can just set alive to true during the initialization, instead of during startupComplete().


> unit tests can hand on socketserver shutdown
> --------------------------------------------
>
>                 Key: KAFKA-1574
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1574
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8.2
>            Reporter: Jun Rao
>              Labels: newbie++
>
> Saw the following stacktrace.
> "kafka-network-thread-59843-2" prio=5 tid=7fc7e5943800 nid=0x11eefa000 runnable [11eef9000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
>         at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:136)
>         at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:69)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>         - locked <7f4a80328> (a sun.nio.ch.Util$2)
>         - locked <7f4a80310> (a java.util.Collections$UnmodifiableSet)
>         - locked <7f4a71968> (a sun.nio.ch.KQueueSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>         at kafka.network.Processor.run(SocketServer.scala:296)
>         at java.lang.Thread.run(Thread.java:695)
> "Test worker" prio=5 tid=7fc7e50d4800 nid=0x11534c000 waiting on condition [115349000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <7f4a69d50> (a java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>         at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:113)
>         at kafka.network.SocketServer$$anonfun$shutdown$2.apply(SocketServer.scala:92)
>         at kafka.network.SocketServer$$anonfun$shutdown$2.apply(SocketServer.scala:91)
>         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>         at kafka.network.SocketServer.shutdown(SocketServer.scala:91)
>         at kafka.server.KafkaServer$$anonfun$shutdown$3.apply$mcV$sp(KafkaServer.scala:246)
>         at kafka.utils.Utils$.swallow(Utils.scala:172)
>         at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
>         at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
>         at kafka.utils.Logging$class.swallow(Logging.scala:94)
>         at kafka.utils.Utils$.swallow(Utils.scala:45)
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:246)
>         at kafka.admin.AdminTest$$anonfun$testPartitionReassignmentNonOverlappingReplicas$3.apply(AdminTest.scala:232)
>         at kafka.admin.AdminTest$$anonfun$testPartitionReassignmentNonOverlappingReplicas$3.apply(AdminTest.scala:232)



--
This message was sent by Atlassian JIRA
(v6.2#6252)