You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2014/05/22 18:15:02 UTC

[jira] [Closed] (AMQ-5190) deadlock between producer and consumer

     [ https://issues.apache.org/jira/browse/AMQ-5190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timothy Bish closed AMQ-5190.
-----------------------------

    Resolution: Incomplete

No test case provided to validate this against a current broker release, 5.5.0 is ancient at this point.  

> deadlock between producer and consumer
> --------------------------------------
>
>                 Key: AMQ-5190
>                 URL: https://issues.apache.org/jira/browse/AMQ-5190
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.0
>         Environment: Linux, Spring, Java 6
>            Reporter: Randy Prager
>
> We observed a deadlock in the broker, see the 2 below.
> # we use spring to configure the broker, producer and consumer
> # producer and consumer share the same pooled connection factory
> # we have producer flow control enabled
> h2. Deadlock
> {noformat}
> "incommingMessageListenerContainer-1" prio=10 tid=0x00007f5a7d0d8800 nid=0xdae5 waiting for monitor entry [0x00007f5a00cef000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> 	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
> 	- waiting to lock <0x00000000c43cf2a8> (a java.lang.Object)
> 	at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> 	at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
> 	at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
> 	at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1863)
> 	at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2029)
> 	at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2024)
> 	at org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:871)
> 	- locked <0x00000000c5f111e8> (a java.util.LinkedList)
> 	at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:585)
> 	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
> 	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
> 	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
> 	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1059)
> 	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1051)
> 	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:948)
> 	at java.lang.Thread.run(Thread.java:662)
> {noformat}
> {noformat}
> "default-selector" prio=10 tid=0x00007f5a7cf35000 nid=0xdae8 waiting on condition [0x00007f5a009eb000]
>    java.lang.Thread.State: WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00000000c4354548> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> 	at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> 	at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:94)
> 	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
> 	- locked <0x00000000c43cf2a8> (a java.lang.Object)
> 	at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> 	at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
> 	at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
> 	at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1744)
> 	- locked <0x00000000c6343788> (a java.lang.Object)
> 	at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
> 	at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
> 	- locked <0x00000000c62e6548> (a org.apache.activemq.ActiveMQMessageProducer)
> 	at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
> 	at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592)
> 	at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:569)
> 	at org.springframework.jms.core.JmsTemplate$3.doInJms(JmsTemplate.java:536)
> 	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
> 	at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
> {noformat}
> h2. Producer Flow Control Thread
> {noformat}
> "ActiveMQ Task-1" daemon prio=10 tid=0x00007f5a7d06f800 nid=0xdae0 in Object.wait() [0x00007f5a011f4000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> 	at java.lang.Object.wait(Native Method)
> 	at org.apache.activemq.usage.Usage.waitForSpace(Usage.java:97)
> 	- locked <0x00000000c60c6130> (a java.lang.Object)
> 	at org.apache.activemq.broker.region.BaseDestination.waitForSpace(BaseDestination.java:589)
> 	at org.apache.activemq.broker.region.BaseDestination.waitForSpace(BaseDestination.java:573)
> 	at org.apache.activemq.broker.region.Topic.send(Topic.java:379)
> 	at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
> 	at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523)
> 	at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
> 	at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
> 	at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
> 	at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
> 	at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:458)
> 	at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681)
> 	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306)
> 	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> 	at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> 	at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
> 	at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:218)
> 	at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
> 	at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> 	at java.lang.Thread.run(Thread.java:662)
> {noformat}
> h2. Spring Bits
> {code:xml}
> <amq:broker useJmx="false" persistent="false">	
> 		
> 		<amq:destinationPolicy> 
>             <amq:policyMap> 
>               <amq:policyEntries> 
>                 <amq:policyEntry topic=">" producerFlowControl="false" memoryLimit="20mb">
>                 	<amq:dispatchPolicy>
>               			<amq:strictOrderDispatchPolicy />
>             		</amq:dispatchPolicy>
>             		<amq:pendingSubscriberPolicy>
>             			<amq:fileCursor />
>             		</amq:pendingSubscriberPolicy>
>                 </amq:policyEntry> 
>               </amq:policyEntries> 
>             </amq:policyMap> 
>         </amq:destinationPolicy> 
>          <amq:systemUsage> 
>             <amq:systemUsage> 
>                 <amq:memoryUsage> 
>                     <amq:memoryUsage limit="100 mb"/> 
>                 </amq:memoryUsage> 
>                 <amq:storeUsage> 
>                     <amq:storeUsage limit="100 gb"/> 
>                 </amq:storeUsage> 
>                 <amq:tempUsage> 
>                     <amq:tempUsage limit="100 gb"/> 
>                 </amq:tempUsage> 
>             </amq:systemUsage> 
>         </amq:systemUsage> 
>         
>         <amq:transportConnectors>
> 			<amq:transportConnector uri="tcp://localhost:0" />
> 		</amq:transportConnectors>
> 		
> 	</amq:broker>
> 	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
> 		<property name="connectionFactory">
> 			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
> 				<property name="brokerURL" value="vm://localhost" />
> 				<property name="useAsyncSend" value="true" />
> 				<property name="optimizeAcknowledge" value="false" />
> 				<property name="warnAboutUnstartedConnectionTimeout" value="500" />
> 			</bean>
> 		</property>
> 	</bean>
> 	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
> 		<property name="connectionFactory" ref="pooledConnectionFactory" />
> 	</bean>
> 	<bean id="DBTopic" class="org.apache.activemq.command.ActiveMQTopic">
> 		<constructor-arg value="db.topic" />
> 	</bean>
> 	<bean id="DBProducer" class="db.DBProducer">
> 		<property name="template" ref="jmsTemplate" />
> 		<property name="destination" ref="DBTopic" />
> 	</bean>
> 	<bean id="DBConsumer" class="db.DBConsumer" />
> 	<!-- <jms:listener-container connection-factory="pooledConnectionFactory"> -->
> 	<!-- <jms:listener destination="db.topic" ref="DBConsumer" method="messageReceived" 
> 		/> -->
> 	<!-- </jms:listener-container> -->
> 	<bean id="incommingMessageListenerContainer"
> 		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
> 		<property name="connectionFactory" ref="pooledConnectionFactory" />
> 		<property name="destination" ref="DBTopic" />
> 		<property name="messageListener" ref="DBConsumer" />
> 		<!-- Retry connection every 10 seconds. -->
> 		<property name="recoveryInterval" value="10000" />
> 	</bean>
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)