You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2014/05/22 18:15:02 UTC
[jira] [Closed] (AMQ-5190) deadlock between producer and consumer
[ https://issues.apache.org/jira/browse/AMQ-5190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Bish closed AMQ-5190.
-----------------------------
Resolution: Incomplete
No test case provided to validate this against a current broker release, 5.5.0 is ancient at this point.
> deadlock between producer and consumer
> --------------------------------------
>
> Key: AMQ-5190
> URL: https://issues.apache.org/jira/browse/AMQ-5190
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.0
> Environment: Linux, Spring, Java 6
> Reporter: Randy Prager
>
> We observed a deadlock in the broker, see the 2 below.
> # we use spring to configure the broker, producer and consumer
> # producer and consumer share the same pooled connection factory
> # we have producer flow control enabled
> h2. Deadlock
> {noformat}
> "incommingMessageListenerContainer-1" prio=10 tid=0x00007f5a7d0d8800 nid=0xdae5 waiting for monitor entry [0x00007f5a00cef000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
> - waiting to lock <0x00000000c43cf2a8> (a java.lang.Object)
> at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
> at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
> at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1863)
> at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2029)
> at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2024)
> at org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:871)
> - locked <0x00000000c5f111e8> (a java.util.LinkedList)
> at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:585)
> at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
> at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
> at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
> at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1059)
> at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1051)
> at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:948)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> {noformat}
> "default-selector" prio=10 tid=0x00007f5a7cf35000 nid=0xdae8 waiting on condition [0x00007f5a009eb000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000c4354548> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:94)
> at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
> - locked <0x00000000c43cf2a8> (a java.lang.Object)
> at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
> at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
> at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1744)
> - locked <0x00000000c6343788> (a java.lang.Object)
> at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
> at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
> - locked <0x00000000c62e6548> (a org.apache.activemq.ActiveMQMessageProducer)
> at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
> at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592)
> at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:569)
> at org.springframework.jms.core.JmsTemplate$3.doInJms(JmsTemplate.java:536)
> at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
> at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
> {noformat}
> h2. Producer Flow Control Thread
> {noformat}
> "ActiveMQ Task-1" daemon prio=10 tid=0x00007f5a7d06f800 nid=0xdae0 in Object.wait() [0x00007f5a011f4000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at org.apache.activemq.usage.Usage.waitForSpace(Usage.java:97)
> - locked <0x00000000c60c6130> (a java.lang.Object)
> at org.apache.activemq.broker.region.BaseDestination.waitForSpace(BaseDestination.java:589)
> at org.apache.activemq.broker.region.BaseDestination.waitForSpace(BaseDestination.java:573)
> at org.apache.activemq.broker.region.Topic.send(Topic.java:379)
> at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
> at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523)
> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
> at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
> at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
> at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
> at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:458)
> at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681)
> at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306)
> at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
> at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:218)
> at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
> at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> h2. Spring Bits
> {code:xml}
> <amq:broker useJmx="false" persistent="false">
>
> <amq:destinationPolicy>
> <amq:policyMap>
> <amq:policyEntries>
> <amq:policyEntry topic=">" producerFlowControl="false" memoryLimit="20mb">
> <amq:dispatchPolicy>
> <amq:strictOrderDispatchPolicy />
> </amq:dispatchPolicy>
> <amq:pendingSubscriberPolicy>
> <amq:fileCursor />
> </amq:pendingSubscriberPolicy>
> </amq:policyEntry>
> </amq:policyEntries>
> </amq:policyMap>
> </amq:destinationPolicy>
> <amq:systemUsage>
> <amq:systemUsage>
> <amq:memoryUsage>
> <amq:memoryUsage limit="100 mb"/>
> </amq:memoryUsage>
> <amq:storeUsage>
> <amq:storeUsage limit="100 gb"/>
> </amq:storeUsage>
> <amq:tempUsage>
> <amq:tempUsage limit="100 gb"/>
> </amq:tempUsage>
> </amq:systemUsage>
> </amq:systemUsage>
>
> <amq:transportConnectors>
> <amq:transportConnector uri="tcp://localhost:0" />
> </amq:transportConnectors>
>
> </amq:broker>
> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
> <property name="connectionFactory">
> <bean class="org.apache.activemq.ActiveMQConnectionFactory">
> <property name="brokerURL" value="vm://localhost" />
> <property name="useAsyncSend" value="true" />
> <property name="optimizeAcknowledge" value="false" />
> <property name="warnAboutUnstartedConnectionTimeout" value="500" />
> </bean>
> </property>
> </bean>
> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
> <property name="connectionFactory" ref="pooledConnectionFactory" />
> </bean>
> <bean id="DBTopic" class="org.apache.activemq.command.ActiveMQTopic">
> <constructor-arg value="db.topic" />
> </bean>
> <bean id="DBProducer" class="db.DBProducer">
> <property name="template" ref="jmsTemplate" />
> <property name="destination" ref="DBTopic" />
> </bean>
> <bean id="DBConsumer" class="db.DBConsumer" />
> <!-- <jms:listener-container connection-factory="pooledConnectionFactory"> -->
> <!-- <jms:listener destination="db.topic" ref="DBConsumer" method="messageReceived"
> /> -->
> <!-- </jms:listener-container> -->
> <bean id="incommingMessageListenerContainer"
> class="org.springframework.jms.listener.DefaultMessageListenerContainer">
> <property name="connectionFactory" ref="pooledConnectionFactory" />
> <property name="destination" ref="DBTopic" />
> <property name="messageListener" ref="DBConsumer" />
> <!-- Retry connection every 10 seconds. -->
> <property name="recoveryInterval" value="10000" />
> </bean>
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)