You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by "Oleg Zhurakousky (JIRA)" <ji...@apache.org> on 2016/04/01 18:32:25 UTC

[jira] [Created] (NIFI-1722) Intermittent deadlocks in Kafka API when when stopping GetKafka

Oleg Zhurakousky created NIFI-1722:
--------------------------------------

             Summary: Intermittent deadlocks in Kafka API when when stopping GetKafka
                 Key: NIFI-1722
                 URL: https://issues.apache.org/jira/browse/NIFI-1722
             Project: Apache NiFi
          Issue Type: Bug
            Reporter: Oleg Zhurakousky
            Assignee: Oleg Zhurakousky
             Fix For: 0.7.0


It appears that Kafka gets in the state of deadlock when _ConsumerConnector.commitOffsets(..)_ is executed during stop call in GetKafka. Looking at the thread dump it may be related to the fact that we are shutting down consumer when onTrigger is still executing. But It also appears that onTrigger is in the deadlock state as well.
Below are the relevant thread dump segments
{code}
"StandardProcessScheduler Thread-7" Id=115 BLOCKED  on java.lang.Object@2baae51
	at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
	at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)


"ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001" Id=25120 WAITING  on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2
	at sun.misc.Unsafe.park(Native Method)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
	at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
	at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53)
	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142)
	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126)
	at scala.Option.foreach(Option.scala:236)
	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126)
	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123)
	at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123)
	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
	Number of Locked Synchronizers: 1
	- java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f


"ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001" Id=35266 RUNNABLE  (in native code)
	at sun.nio.ch.Net.poll(Native Method)
	at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
	- waiting on java.lang.Object@3e91ba2b
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
	- waiting on java.lang.Object@494070b0
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	- waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
	- waiting on java.lang.Object@7d49c744
	at kafka.utils.CoreUtils$.read(CoreUtils.scala:192)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78)
	- waiting on java.lang.Object@4eba7a24
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"Timer-Driven Process Thread-10" Id=92 TIMED_WAITING  on java.util.concurrent.CountDownLatch$Sync@7a46b8e8
	at sun.misc.Unsafe.park(Native Method)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
	at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
	at org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152)
	at org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141)
	at org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299)
	at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
	at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
	at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295)
	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
	Number of Locked Synchronizers: 1
	- java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748

"StandardProcessScheduler Thread-7" Id=115 BLOCKED  on java.lang.Object@2baae51
	at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
	at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
	at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110)
	at org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
	at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
	at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
	at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
	at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
	Number of Locked Synchronizers: 1
	- java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)