You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jiangjie Qin (JIRA)" <ji...@apache.org> on 2015/03/30 01:45:53 UTC

[jira] [Issue Comment Deleted] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

     [ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jiangjie Qin updated KAFKA-1716:
--------------------------------
    Comment: was deleted

(was: [~dchu] Do you mean that the fetchers have never been created? That's a good point, but I still do not totally understand the cause.
The first rebalance of ZookeeperConsumeConnector occurs when KafkaStreams are created. That means you need to specify a topic count map and create streams. So leader finder thread will send TopicMetadataRequest to brokers to get back the topic metadata for the topic. By default auto topic creation is enabled on Kafka brokers. That means when broker saw a TopicMetadataRequest asking for a topic that does not exist yet, it will created it and return the topic metadata. So the consumer fetcher thread will be created for the topic on ZookeeperConsumerConnector. However, if auto topic creation is turned off, your description looks possible.
About the shutdown issue. You are right, that is an issue that has been fixed in KAFKA-1848, but seems not included in 0.8.2. I just changed the fix version from 0.9.0 to 0.8.3 instead.)

> hang during shutdown of ZookeeperConsumerConnector
> --------------------------------------------------
>
>                 Key: KAFKA-1716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1716
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.1.1
>            Reporter: Sean Fay
>            Assignee: Neha Narkhede
>         Attachments: after-shutdown.log, before-shutdown.log, kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process.
> Shutdown thread:
> {code}    -- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>     at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>     at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>     at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
>     at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>     at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
>     at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
>     at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
>     at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>     at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
>     at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
>     at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
>     at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>     at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
>     ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
>     at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
>     at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
>     at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}    -- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>     at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>     at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>     at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
>     at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
>     at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka/utils/Utils$.inLock(Utils.scala:538)
>     at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
>     at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>     at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
>     at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Re: [jira] [Issue Comment Deleted] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

Posted by Mayuresh Gharat <gh...@gmail.com>.
You can take a look at this :

https://issues.apache.org/jira/browse/KAFKA-1848

Thanks,

Mayuresh

On Sun, Mar 29, 2015 at 4:45 PM, Jiangjie Qin (JIRA) <ji...@apache.org>
wrote:

>
>      [
> https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
> ]
>
> Jiangjie Qin updated KAFKA-1716:
> --------------------------------
>     Comment: was deleted
>
> (was: [~dchu] Do you mean that the fetchers have never been created?
> That's a good point, but I still do not totally understand the cause.
> The first rebalance of ZookeeperConsumeConnector occurs when KafkaStreams
> are created. That means you need to specify a topic count map and create
> streams. So leader finder thread will send TopicMetadataRequest to brokers
> to get back the topic metadata for the topic. By default auto topic
> creation is enabled on Kafka brokers. That means when broker saw a
> TopicMetadataRequest asking for a topic that does not exist yet, it will
> created it and return the topic metadata. So the consumer fetcher thread
> will be created for the topic on ZookeeperConsumerConnector. However, if
> auto topic creation is turned off, your description looks possible.
> About the shutdown issue. You are right, that is an issue that has been
> fixed in KAFKA-1848, but seems not included in 0.8.2. I just changed the
> fix version from 0.9.0 to 0.8.3 instead.)
>
> > hang during shutdown of ZookeeperConsumerConnector
> > --------------------------------------------------
> >
> >                 Key: KAFKA-1716
> >                 URL: https://issues.apache.org/jira/browse/KAFKA-1716
> >             Project: Kafka
> >          Issue Type: Bug
> >          Components: consumer
> >    Affects Versions: 0.8.1.1
> >            Reporter: Sean Fay
> >            Assignee: Neha Narkhede
> >         Attachments: after-shutdown.log, before-shutdown.log,
> kafka-shutdown-stuck.log
> >
> >
> > It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}}
> to wedge in the case that some consumer fetcher threads receive messages
> during the shutdown process.
> > Shutdown thread:
> > {code}    -- Parking to wait for:
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> >     at jrockit/vm/Locks.park0(J)V(Native Method)
> >     at jrockit/vm/Locks.park(Locks.java:2230)
> >     at sun/misc/Unsafe.park(ZJ)V(Native Method)
> >     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> >     at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> >     at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> >     at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> >     at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> >     at
> kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> >     at
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> >     at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> >     at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> >     at
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >     at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >     at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >     at
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> >     at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> >     at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> >     at
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >     at
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> >     ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> >     at
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> >     at
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> >     at
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> > ConsumerFetcherThread:
> > {code}    -- Parking to wait for:
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> >     at jrockit/vm/Locks.park0(J)V(Native Method)
> >     at jrockit/vm/Locks.park(Locks.java:2230)
> >     at sun/misc/Unsafe.park(ZJ)V(Native Method)
> >     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> >     at
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >     at
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> >     at
> kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >     at
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >     at
> scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> >     at
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at kafka/utils/Utils$.inLock(Utils.scala:538)
> >     at
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >     at
> kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >     at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
> >     at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125