You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by "Oleg Zhurakousky (JIRA)" <ji...@apache.org> on 2016/04/01 18:32:25 UTC
[jira] [Created] (NIFI-1722) Intermittent deadlocks in Kafka API
when when stopping GetKafka
Oleg Zhurakousky created NIFI-1722:
--------------------------------------
Summary: Intermittent deadlocks in Kafka API when when stopping GetKafka
Key: NIFI-1722
URL: https://issues.apache.org/jira/browse/NIFI-1722
Project: Apache NiFi
Issue Type: Bug
Reporter: Oleg Zhurakousky
Assignee: Oleg Zhurakousky
Fix For: 0.7.0
It appears that Kafka gets in the state of deadlock when _ConsumerConnector.commitOffsets(..)_ is executed during stop call in GetKafka. Looking at the thread dump it may be related to the fact that we are shutting down consumer when onTrigger is still executing. But It also appears that onTrigger is in the deadlock state as well.
Below are the relevant thread dump segments
{code}
"StandardProcessScheduler Thread-7" Id=115 BLOCKED on java.lang.Object@2baae51
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
"ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001" Id=25120 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126)
at scala.Option.foreach(Option.scala:236)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Number of Locked Synchronizers: 1
- java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f
"ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001" Id=35266 RUNNABLE (in native code)
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- waiting on java.lang.Object@3e91ba2b
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
- waiting on java.lang.Object@494070b0
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
- waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
- waiting on java.lang.Object@7d49c744
at kafka.utils.CoreUtils$.read(CoreUtils.scala:192)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78)
- waiting on java.lang.Object@4eba7a24
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
"Timer-Driven Process Thread-10" Id=92 TIMED_WAITING on java.util.concurrent.CountDownLatch$Sync@7a46b8e8
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
at org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152)
at org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141)
at org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Number of Locked Synchronizers: 1
- java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748
"StandardProcessScheduler Thread-7" Id=115 BLOCKED on java.lang.Object@2baae51
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110)
at org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Number of Locked Synchronizers: 1
- java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)