You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2018/04/18 11:07:00 UTC

[jira] [Resolved] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

     [ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Manikumar resolved KAFKA-1716.
------------------------------
    Resolution: Auto Closed

Closing inactive issue. The Scala consumers have been deprecated and no further work is planned, please upgrade to the Java consumer whenever possible.

> hang during shutdown of ZookeeperConsumerConnector
> --------------------------------------------------
>
>                 Key: KAFKA-1716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1716
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.1.1
>            Reporter: Sean Fay
>            Assignee: Neha Narkhede
>            Priority: Major
>         Attachments: after-shutdown.log, before-shutdown.log, kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process.
> Shutdown thread:
> {code}    -- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>     at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>     at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>     at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
>     at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>     at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
>     at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
>     at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
>     at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>     at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
>     at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
>     at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
>     at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>     at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
>     ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
>     at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
>     at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
>     at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}    -- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>     at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>     at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>     at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
>     at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
>     at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka/utils/Utils$.inLock(Utils.scala:538)
>     at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
>     at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>     at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
>     at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)