You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by boday <be...@initekconsulting.com> on 2013/07/22 21:46:19 UTC

consumer threads waiting on VMTransport locks...

seeing some thread blocking in our ActiveMQ 5.7 production application (using
VM transport and AMQ connection pool)...anyone know of any
configuration/known bugs that could be contributing to this?

seeing 20 thread like this one, WAITING for a single thread to release a
lock on 659ec0a4

"Camel (camel-1) thread #29 - JmsConsumer[MyQueue]" - Thread t@130
   java.lang.Thread.State: WAITING
	at sun.misc.Unsafe.park(Native Method)
	- waiting to lock <659ec0a4> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "Camel
(camel-1) thread #13 - JmsConsumer[MyQueue]" t@94
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
	at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
	at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
	at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
	at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
	at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
	at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
	at
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
	at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
	at
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
	at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
	at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
	at
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
	- locked <7212432> (a java.util.LinkedList)
	at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
	at
org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)

   Locked ownable synchronizers:
	- locked <10cf5246> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)


then thiis thread appears to be locking the resource in question...659ec0a4


"Camel (camel-1) thread #13 - JmsConsumer[MyQueue]" - Thread t@94
   java.lang.Thread.State: WAITING
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for <1ec8dc2f> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
	at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
	at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
	at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:93)
	at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
	at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
	at
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
	at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
	at
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
	at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
	at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
	at
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
	- locked <119a6cfe> (a java.util.LinkedList)
	at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
	at
org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)

   Locked ownable synchronizers:
	- locked <659ec0a4> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)

	- locked <3169a0b9> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)







--
View this message in context: http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: consumer threads waiting on VMTransport locks...

Posted by Gary Tully <ga...@gmail.com>.
it would be useful to see a full thread dump of the vm to see the read
end of that vm connection.

On 24 July 2013 17:40, boday <be...@initekconsulting.com> wrote:
> at this point, I can't reproduce it on demand, just seeing random locking
> occur after hours of uptime...
>
>
> ceposta wrote
>> Any chance you can reproduce this?
>>
>>
>> On Tue, Jul 23, 2013 at 1:03 AM, boday &lt;
>
>> ben.oday@
>
>> &gt;wrote:
>>
>>> no errors in the logs, but we see message flow resume after a server
>>> restart...I'm starting to question our connection/server configuration a
>>> bit
>>> now given this thread dump...
>>>
>>> our application uses camel-activemq (2.10.1) and runs embedded in an AMQ
>>> broker (5.7)....use the same connection factory/pool for all of our apps
>>> camel routes that produce/consume from queues, etc...
>>>
>>> here is my client code connection setup...
>>>
>>>         ActiveMQConnectionFactory queueConnectionFactory = new
>>> ActiveMQConnectionFactory("vm://localhost?create=false");
>>>     queueConnectionFactory.setObjectMessageSerializationDefered(true);
>>>         queueConnectionFactory.setOptimizedMessageDispatch(true);
>>>         queueConnectionFactory.setDispatchAsync(true);
>>>         queueConnectionFactory.setUseAsyncSend(true);
>>>         queueConnectionFactory.setAlwaysSessionAsync(true);
>>>
>>>     PooledConnectionFactory pcf = new
>>> PooledConnectionFactory(queueConnectionFactory);
>>>         pcf.setIdleTimeout(0);
>>>
>>>         ActiveMQComponent amq =
>>> ActiveMQComponent.activeMQComponent(brokerURL);
>>>         amq.setConnectionFactory(pcf);
>>>         amq.setMaxConcurrentConsumers(20);
>>>         amq.setCacheLevelName("CACHE_CONSUMER");
>>>         amq.setAcknowledgementModeName("AUTO_ACKNOWLEDGE");
>>>         this.getContext().addComponent("activemq", amq);
>>>
>>> here is my activemq.xml...
>>>
>>>
>> <broker xmlns="http://activemq.apache.org/schema/core"
>>>
>>  brokerName="localhost" dataDirectory="../data" useJmx="true">
>>>
>>>
>> <destinationPolicy>
>>>
>> <policyMap>
>>>
>> <policyEntries>
>>>
>> <policyEntry queue=">
>> " producerFlowControl="false">
>>>
>> <pendingQueuePolicy>
>>>
>> <vmQueueCursor/>
>>>
>> </pendingQueuePolicy>
>>>
>> </policyEntry>
>>>
>> </policyEntries>
>>>
>> </policyMap>
>>>
>> </destinationPolicy>
>>>
>>>
>> <managementContext>
>>>
>> <managementContext createConnector="false"/>
>>>
>> </managementContext>
>>>
>>>
>> <networkConnectors>
>>>
>> </networkConnectors>
>>>
>>>
>> <persistenceAdapter>
>>>
>> <kahaDB directory="../data/kaha"
>>>
>>                  enableJournalDiskSyncs="false"
>>>                 indexWriteBatchSize="10000"
>>>                 indexCacheSize="1000" />
>>>
>> </persistenceAdapter>
>>>
>>>
>> <systemUsage>
>>>
>> <systemUsage>
>>>
>> <memoryUsage>
>> <memoryUsage limit="4 gb"/>
>> </memoryUsage>
>>>
>> <storeUsage>
>> <storeUsage limit="100 gb"/>
>> </storeUsage>
>>>
>> <tempUsage>
>> <tempUsage limit="10 gb"/>
>> </tempUsage>
>>>
>> </systemUsage>
>>>
>> </systemUsage>
>>>
>>>
>> <transportConnectors>
>>>
>> <transportConnector name="openwire" uri="tcp://0.0.0.0:61668"
>>>
>>  />
>>>
>> <transportConnector name="vm" uri="vm://localhost" />
>>>
>> </transportConnectors>
>>>
>> </broker>
>>>
>>>
>> <import resource="jndi.xml"/>
>>>
>> <import resource="camel.xml"/>
>>>
>>>
>>> ceposta wrote
>>> > Did you see any exceptions in the broker logs?
>>> > Seems like the server-side VMTransport stopped draining its message
>>> queue
>>> > and processing consumer acks... causing the client VMTransport to block
>>> on
>>> > a put...and all other consumers using that connection to block...
>>> >
>>> >
>>> > On Mon, Jul 22, 2013 at 3:46 PM, boday &lt;
>>>
>>> > ben.oday@
>>>
>>> > &gt;wrote:
>>> >
>>> >> seeing some thread blocking in our ActiveMQ 5.7 production application
>>> >> (using
>>> >> VM transport and AMQ connection pool)...anyone know of any
>>> >> configuration/known bugs that could be contributing to this?
>>> >>
>>> >> seeing 20 thread like this one, WAITING for a single thread to release
>>> a
>>> >> lock on 659ec0a4
>>> >>
>>> >> "Camel (camel-1) thread #29 - JmsConsumer[MyQueue]" - Thread t@130
>>> >>    java.lang.Thread.State: WAITING
>>> >>         at sun.misc.Unsafe.park(Native Method)
>>> >>         - waiting to lock <659ec0a4> (a
>>> >> java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "Camel
>>> >> (camel-1) thread #13 - JmsConsumer[MyQueue]" t@94
>>> >>         at
>>> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>>> >>         at
>>> >> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>>> >>         at
>>> >>
>>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
>>> >>         at
>>> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
>>> >>         at
>>> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
>>> >>         - locked <7212432> (a java.util.LinkedList)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>> >>         at java.lang.Thread.run(Thread.java:662)
>>> >>
>>> >>    Locked ownable synchronizers:
>>> >>         - locked <10cf5246> (a
>>> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>> >>
>>> >>
>>> >> then thiis thread appears to be locking the resource in
>>> >> question...659ec0a4
>>> >>
>>> >>
>>> >> "Camel (camel-1) thread #13 - JmsConsumer[MyQueue]" - Thread t@94
>>> >>    java.lang.Thread.State: WAITING
>>> >>         at sun.misc.Unsafe.park(Native Method)
>>> >>         - parking to wait for <1ec8dc2f> (a
>>> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>> >>         at
>>> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>> >>         at
>>> >>
>>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>>> >>         at
>>> >>
>>> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:93)
>>> >>         at
>>> >>
>>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
>>> >>         at
>>> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
>>> >>         at
>>> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
>>> >>         - locked <119a6cfe> (a java.util.LinkedList)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
>>> >>         at
>>> >>
>>> >>
>>> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
>>> >>         at
>>> >>
>>> >>
>>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>> >>         at
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>> >>         at java.lang.Thread.run(Thread.java:662)
>>> >>
>>> >>    Locked ownable synchronizers:
>>> >>         - locked <659ec0a4> (a
>>> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>> >>
>>> >>         - locked <3169a0b9> (a
>>> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> View this message in context:
>>> >>
>>> http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580.html
>>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > *Christian Posta*
>>> > http://www.christianposta.com/blog
>>> > twitter: @christianposta
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580p4669593.html
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>
>>
>>
>>
>> --
>> *Christian Posta*
>> http://www.christianposta.com/blog
>> twitter: @christianposta
>
>
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580p4669684.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.



-- 
http://redhat.com
http://blog.garytully.com

Re: consumer threads waiting on VMTransport locks...

Posted by boday <be...@initekconsulting.com>.
at this point, I can't reproduce it on demand, just seeing random locking
occur after hours of uptime...


ceposta wrote
> Any chance you can reproduce this?
> 
> 
> On Tue, Jul 23, 2013 at 1:03 AM, boday &lt;

> ben.oday@

> &gt;wrote:
> 
>> no errors in the logs, but we see message flow resume after a server
>> restart...I'm starting to question our connection/server configuration a
>> bit
>> now given this thread dump...
>>
>> our application uses camel-activemq (2.10.1) and runs embedded in an AMQ
>> broker (5.7)....use the same connection factory/pool for all of our apps
>> camel routes that produce/consume from queues, etc...
>>
>> here is my client code connection setup...
>>
>>         ActiveMQConnectionFactory queueConnectionFactory = new
>> ActiveMQConnectionFactory("vm://localhost?create=false");
>>     queueConnectionFactory.setObjectMessageSerializationDefered(true);
>>         queueConnectionFactory.setOptimizedMessageDispatch(true);
>>         queueConnectionFactory.setDispatchAsync(true);
>>         queueConnectionFactory.setUseAsyncSend(true);
>>         queueConnectionFactory.setAlwaysSessionAsync(true);
>>
>>     PooledConnectionFactory pcf = new
>> PooledConnectionFactory(queueConnectionFactory);
>>         pcf.setIdleTimeout(0);
>>
>>         ActiveMQComponent amq =
>> ActiveMQComponent.activeMQComponent(brokerURL);
>>         amq.setConnectionFactory(pcf);
>>         amq.setMaxConcurrentConsumers(20);
>>         amq.setCacheLevelName("CACHE_CONSUMER");
>>         amq.setAcknowledgementModeName("AUTO_ACKNOWLEDGE");
>>         this.getContext().addComponent("activemq", amq);
>>
>> here is my activemq.xml...
>>
>>    
> <broker xmlns="http://activemq.apache.org/schema/core"
>>
>  brokerName="localhost" dataDirectory="../data" useJmx="true">
>>
>>         
> <destinationPolicy>
>>                
> <policyMap>
>>                   
> <policyEntries>
>>                    
> <policyEntry queue=">
> " producerFlowControl="false">
>>                    
> <pendingQueuePolicy>
>>                      
> <vmQueueCursor/>
>>                    
> </pendingQueuePolicy>
>>                   
> </policyEntry>
>>                   
> </policyEntries>
>>                
> </policyMap>
>>         
> </destinationPolicy>
>>
>>         
> <managementContext>
>>             
> <managementContext createConnector="false"/>
>>         
> </managementContext>
>>
>>         
> <networkConnectors>
>>         
> </networkConnectors>
>>
>>         
> <persistenceAdapter>
>>             
> <kahaDB directory="../data/kaha"
>>
>                  enableJournalDiskSyncs="false"
>>                 indexWriteBatchSize="10000"
>>                 indexCacheSize="1000" />
>>         
> </persistenceAdapter>
>>
>>         
> <systemUsage>
>>             
> <systemUsage>
>>                 
> <memoryUsage>
> <memoryUsage limit="4 gb"/>
> </memoryUsage>
>>                 
> <storeUsage>
> <storeUsage limit="100 gb"/>
> </storeUsage>
>>                 
> <tempUsage>
> <tempUsage limit="10 gb"/>
> </tempUsage>
>>             
> </systemUsage>
>>         
> </systemUsage>
>>
>>         
> <transportConnectors>
>>             
> <transportConnector name="openwire" uri="tcp://0.0.0.0:61668"
>>
>  />
>>             
> <transportConnector name="vm" uri="vm://localhost" />
>>         
> </transportConnectors>
>>     
> </broker>
>>
>>     
> <import resource="jndi.xml"/>
>>     
> <import resource="camel.xml"/>
>>
>>
>> ceposta wrote
>> > Did you see any exceptions in the broker logs?
>> > Seems like the server-side VMTransport stopped draining its message
>> queue
>> > and processing consumer acks... causing the client VMTransport to block
>> on
>> > a put...and all other consumers using that connection to block...
>> >
>> >
>> > On Mon, Jul 22, 2013 at 3:46 PM, boday &lt;
>>
>> > ben.oday@
>>
>> > &gt;wrote:
>> >
>> >> seeing some thread blocking in our ActiveMQ 5.7 production application
>> >> (using
>> >> VM transport and AMQ connection pool)...anyone know of any
>> >> configuration/known bugs that could be contributing to this?
>> >>
>> >> seeing 20 thread like this one, WAITING for a single thread to release
>> a
>> >> lock on 659ec0a4
>> >>
>> >> "Camel (camel-1) thread #29 - JmsConsumer[MyQueue]" - Thread t@130
>> >>    java.lang.Thread.State: WAITING
>> >>         at sun.misc.Unsafe.park(Native Method)
>> >>         - waiting to lock <659ec0a4> (a
>> >> java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "Camel
>> >> (camel-1) thread #13 - JmsConsumer[MyQueue]" t@94
>> >>         at
>> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>> >>         at
>> >> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>> >>         at
>> >>
>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
>> >>         at
>> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
>> >>         at
>> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
>> >>         - locked <7212432> (a java.util.LinkedList)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>         at java.lang.Thread.run(Thread.java:662)
>> >>
>> >>    Locked ownable synchronizers:
>> >>         - locked <10cf5246> (a
>> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>> >>
>> >>
>> >> then thiis thread appears to be locking the resource in
>> >> question...659ec0a4
>> >>
>> >>
>> >> "Camel (camel-1) thread #13 - JmsConsumer[MyQueue]" - Thread t@94
>> >>    java.lang.Thread.State: WAITING
>> >>         at sun.misc.Unsafe.park(Native Method)
>> >>         - parking to wait for <1ec8dc2f> (a
>> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>> >>         at
>> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>> >>         at
>> >>
>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>> >>         at
>> >>
>> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:93)
>> >>         at
>> >>
>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
>> >>         at
>> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
>> >>         at
>> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
>> >>         - locked <119a6cfe> (a java.util.LinkedList)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
>> >>         at
>> >>
>> >>
>> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
>> >>         at
>> >>
>> >>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>         at
>> >>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>         at java.lang.Thread.run(Thread.java:662)
>> >>
>> >>    Locked ownable synchronizers:
>> >>         - locked <659ec0a4> (a
>> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>> >>
>> >>         - locked <3169a0b9> (a
>> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580.html
>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>
>> >
>> >
>> >
>> > --
>> > *Christian Posta*
>> > http://www.christianposta.com/blog
>> > twitter: @christianposta
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580p4669593.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
> 
> 
> 
> -- 
> *Christian Posta*
> http://www.christianposta.com/blog
> twitter: @christianposta





--
View this message in context: http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580p4669684.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: consumer threads waiting on VMTransport locks...

Posted by Christian Posta <ch...@gmail.com>.
Any chance you can reproduce this?


On Tue, Jul 23, 2013 at 1:03 AM, boday <be...@initekconsulting.com>wrote:

> no errors in the logs, but we see message flow resume after a server
> restart...I'm starting to question our connection/server configuration a
> bit
> now given this thread dump...
>
> our application uses camel-activemq (2.10.1) and runs embedded in an AMQ
> broker (5.7)....use the same connection factory/pool for all of our apps
> camel routes that produce/consume from queues, etc...
>
> here is my client code connection setup...
>
>         ActiveMQConnectionFactory queueConnectionFactory = new
> ActiveMQConnectionFactory("vm://localhost?create=false");
>     queueConnectionFactory.setObjectMessageSerializationDefered(true);
>         queueConnectionFactory.setOptimizedMessageDispatch(true);
>         queueConnectionFactory.setDispatchAsync(true);
>         queueConnectionFactory.setUseAsyncSend(true);
>         queueConnectionFactory.setAlwaysSessionAsync(true);
>
>     PooledConnectionFactory pcf = new
> PooledConnectionFactory(queueConnectionFactory);
>         pcf.setIdleTimeout(0);
>
>         ActiveMQComponent amq =
> ActiveMQComponent.activeMQComponent(brokerURL);
>         amq.setConnectionFactory(pcf);
>         amq.setMaxConcurrentConsumers(20);
>         amq.setCacheLevelName("CACHE_CONSUMER");
>         amq.setAcknowledgementModeName("AUTO_ACKNOWLEDGE");
>         this.getContext().addComponent("activemq", amq);
>
> here is my activemq.xml...
>
>    <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="../data" useJmx="true">
>
>         <destinationPolicy>
>                <policyMap>
>                   <policyEntries>
>                    <policyEntry queue=">" producerFlowControl="false">
>                    <pendingQueuePolicy>
>                      <vmQueueCursor/>
>                    </pendingQueuePolicy>
>                   </policyEntry>
>                   </policyEntries>
>                </policyMap>
>         </destinationPolicy>
>
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>
>         <networkConnectors>
>         </networkConnectors>
>
>         <persistenceAdapter>
>             <kahaDB directory="../data/kaha"
>                 enableJournalDiskSyncs="false"
>                 indexWriteBatchSize="10000"
>                 indexCacheSize="1000" />
>         </persistenceAdapter>
>
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage><memoryUsage limit="4 gb"/></memoryUsage>
>                 <storeUsage><storeUsage limit="100 gb"/></storeUsage>
>                 <tempUsage><tempUsage limit="10 gb"/></tempUsage>
>             </systemUsage>
>         </systemUsage>
>
>         <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:61668"
> />
>             <transportConnector name="vm" uri="vm://localhost" />
>         </transportConnectors>
>     </broker>
>
>     <import resource="jndi.xml"/>
>     <import resource="camel.xml"/>
>
>
> ceposta wrote
> > Did you see any exceptions in the broker logs?
> > Seems like the server-side VMTransport stopped draining its message queue
> > and processing consumer acks... causing the client VMTransport to block
> on
> > a put...and all other consumers using that connection to block...
> >
> >
> > On Mon, Jul 22, 2013 at 3:46 PM, boday &lt;
>
> > ben.oday@
>
> > &gt;wrote:
> >
> >> seeing some thread blocking in our ActiveMQ 5.7 production application
> >> (using
> >> VM transport and AMQ connection pool)...anyone know of any
> >> configuration/known bugs that could be contributing to this?
> >>
> >> seeing 20 thread like this one, WAITING for a single thread to release a
> >> lock on 659ec0a4
> >>
> >> "Camel (camel-1) thread #29 - JmsConsumer[MyQueue]" - Thread t@130
> >>    java.lang.Thread.State: WAITING
> >>         at sun.misc.Unsafe.park(Native Method)
> >>         - waiting to lock <659ec0a4> (a
> >> java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "Camel
> >> (camel-1) thread #13 - JmsConsumer[MyQueue]" t@94
> >>         at
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>         at
> >>
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> >>         at
> >>
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> >>         at
> >>
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> >>         at
> >>
> >>
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> >>         at
> >> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> >>         at
> >>
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> >>         at
> >>
> >>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
> >>         at
> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
> >>         at
> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
> >>         - locked <7212432> (a java.util.LinkedList)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
> >>         at
> >>
> >>
> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
> >>         at
> >>
> >>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
> >>         at
> >>
> >>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
> >>         at
> >>
> >>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
> >>         at
> >>
> >>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
> >>         at
> >>
> >>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
> >>         at
> >>
> >>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
> >>         at
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> >>         at
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> >>         at java.lang.Thread.run(Thread.java:662)
> >>
> >>    Locked ownable synchronizers:
> >>         - locked <10cf5246> (a
> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> >>
> >>
> >> then thiis thread appears to be locking the resource in
> >> question...659ec0a4
> >>
> >>
> >> "Camel (camel-1) thread #13 - JmsConsumer[MyQueue]" - Thread t@94
> >>    java.lang.Thread.State: WAITING
> >>         at sun.misc.Unsafe.park(Native Method)
> >>         - parking to wait for <1ec8dc2f> (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>         at
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>         at
> >>
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>         at
> >>
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> >>         at
> >> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:93)
> >>         at
> >>
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> >>         at
> >>
> >>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
> >>         at
> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
> >>         at
> >> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
> >>         - locked <119a6cfe> (a java.util.LinkedList)
> >>         at
> >>
> >>
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
> >>         at
> >>
> >>
> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
> >>         at
> >>
> >>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
> >>         at
> >>
> >>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
> >>         at
> >>
> >>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
> >>         at
> >>
> >>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
> >>         at
> >>
> >>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
> >>         at
> >>
> >>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
> >>         at
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> >>         at
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> >>         at java.lang.Thread.run(Thread.java:662)
> >>
> >>    Locked ownable synchronizers:
> >>         - locked <659ec0a4> (a
> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> >>
> >>         - locked <3169a0b9> (a
> >> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580.html
> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>
> >
> >
> >
> > --
> > *Christian Posta*
> > http://www.christianposta.com/blog
> > twitter: @christianposta
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580p4669593.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Re: consumer threads waiting on VMTransport locks...

Posted by boday <be...@initekconsulting.com>.
no errors in the logs, but we see message flow resume after a server
restart...I'm starting to question our connection/server configuration a bit
now given this thread dump...

our application uses camel-activemq (2.10.1) and runs embedded in an AMQ
broker (5.7)....use the same connection factory/pool for all of our apps
camel routes that produce/consume from queues, etc...

here is my client code connection setup...

	ActiveMQConnectionFactory queueConnectionFactory = new
ActiveMQConnectionFactory("vm://localhost?create=false");
    queueConnectionFactory.setObjectMessageSerializationDefered(true);
	queueConnectionFactory.setOptimizedMessageDispatch(true);
	queueConnectionFactory.setDispatchAsync(true);
	queueConnectionFactory.setUseAsyncSend(true);
	queueConnectionFactory.setAlwaysSessionAsync(true);

    PooledConnectionFactory pcf = new
PooledConnectionFactory(queueConnectionFactory);
	pcf.setIdleTimeout(0);

	ActiveMQComponent amq = ActiveMQComponent.activeMQComponent(brokerURL);
	amq.setConnectionFactory(pcf);
	amq.setMaxConcurrentConsumers(20);
	amq.setCacheLevelName("CACHE_CONSUMER");
	amq.setAcknowledgementModeName("AUTO_ACKNOWLEDGE");
	this.getContext().addComponent("activemq", amq);

here is my activemq.xml...

   <broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="../data" useJmx="true">

        <destinationPolicy>
               <policyMap>
                  <policyEntries>
                   <policyEntry queue=">" producerFlowControl="false">
                   <pendingQueuePolicy>
                     <vmQueueCursor/>
                   </pendingQueuePolicy>
                  </policyEntry>
                  </policyEntries>
               </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <networkConnectors>
        </networkConnectors>

        <persistenceAdapter>
            <kahaDB directory="../data/kaha" 
                enableJournalDiskSyncs="false" 
                indexWriteBatchSize="10000" 
                indexCacheSize="1000" />
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage><memoryUsage limit="4 gb"/></memoryUsage>
                <storeUsage><storeUsage limit="100 gb"/></storeUsage>
                <tempUsage><tempUsage limit="10 gb"/></tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61668" />
            <transportConnector name="vm" uri="vm://localhost" />
        </transportConnectors>
    </broker>

    <import resource="jndi.xml"/>
    <import resource="camel.xml"/>


ceposta wrote
> Did you see any exceptions in the broker logs?
> Seems like the server-side VMTransport stopped draining its message queue
> and processing consumer acks... causing the client VMTransport to block on
> a put...and all other consumers using that connection to block...
> 
> 
> On Mon, Jul 22, 2013 at 3:46 PM, boday &lt;

> ben.oday@

> &gt;wrote:
> 
>> seeing some thread blocking in our ActiveMQ 5.7 production application
>> (using
>> VM transport and AMQ connection pool)...anyone know of any
>> configuration/known bugs that could be contributing to this?
>>
>> seeing 20 thread like this one, WAITING for a single thread to release a
>> lock on 659ec0a4
>>
>> "Camel (camel-1) thread #29 - JmsConsumer[MyQueue]" - Thread t@130
>>    java.lang.Thread.State: WAITING
>>         at sun.misc.Unsafe.park(Native Method)
>>         - waiting to lock <659ec0a4> (a
>> java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "Camel
>> (camel-1) thread #13 - JmsConsumer[MyQueue]" t@94
>>         at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>         at
>>
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>>         at
>>
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>>         at
>>
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>>         at
>>
>> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>>         at
>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>>         at
>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
>>         at
>>
>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>>         at
>>
>> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
>>         at
>>
>> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
>>         at
>>
>> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
>>         at
>> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
>>         at
>> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
>>         at
>>
>> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
>>         - locked <7212432> (a java.util.LinkedList)
>>         at
>>
>> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
>>         at
>>
>> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
>>         at
>>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
>>         at
>>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>>         at
>>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>>         at
>>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
>>         at
>>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
>>         at
>>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>         at java.lang.Thread.run(Thread.java:662)
>>
>>    Locked ownable synchronizers:
>>         - locked <10cf5246> (a
>> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>
>>
>> then thiis thread appears to be locking the resource in
>> question...659ec0a4
>>
>>
>> "Camel (camel-1) thread #13 - JmsConsumer[MyQueue]" - Thread t@94
>>    java.lang.Thread.State: WAITING
>>         at sun.misc.Unsafe.park(Native Method)
>>         - parking to wait for <1ec8dc2f> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>         at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>         at
>>
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>         at
>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>>         at
>> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:93)
>>         at
>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>>         at
>>
>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>>         at
>>
>> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
>>         at
>>
>> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
>>         at
>>
>> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
>>         at
>> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
>>         at
>> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
>>         at
>>
>> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
>>         - locked <119a6cfe> (a java.util.LinkedList)
>>         at
>>
>> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
>>         at
>>
>> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
>>         at
>>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
>>         at
>>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>>         at
>>
>> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>>         at
>>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
>>         at
>>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
>>         at
>>
>> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>         at java.lang.Thread.run(Thread.java:662)
>>
>>    Locked ownable synchronizers:
>>         - locked <659ec0a4> (a
>> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>
>>         - locked <3169a0b9> (a
>> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
> 
> 
> 
> -- 
> *Christian Posta*
> http://www.christianposta.com/blog
> twitter: @christianposta





--
View this message in context: http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580p4669593.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: consumer threads waiting on VMTransport locks...

Posted by Christian Posta <ch...@gmail.com>.
Did you see any exceptions in the broker logs?
Seems like the server-side VMTransport stopped draining its message queue
and processing consumer acks... causing the client VMTransport to block on
a put...and all other consumers using that connection to block...


On Mon, Jul 22, 2013 at 3:46 PM, boday <be...@initekconsulting.com>wrote:

> seeing some thread blocking in our ActiveMQ 5.7 production application
> (using
> VM transport and AMQ connection pool)...anyone know of any
> configuration/known bugs that could be contributing to this?
>
> seeing 20 thread like this one, WAITING for a single thread to release a
> lock on 659ec0a4
>
> "Camel (camel-1) thread #29 - JmsConsumer[MyQueue]" - Thread t@130
>    java.lang.Thread.State: WAITING
>         at sun.misc.Unsafe.park(Native Method)
>         - waiting to lock <659ec0a4> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "Camel
> (camel-1) thread #13 - JmsConsumer[MyQueue]" t@94
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>         at
>
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>         at
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>         at
>
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
>         at
>
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
>         at
>
> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
>         at
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
>         at
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
>         at
>
> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
>         - locked <7212432> (a java.util.LinkedList)
>         at
>
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
>         at
>
> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
>         at
>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
>         at
>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>         at
>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>         at
>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
>         at
>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
>         at
>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>         at java.lang.Thread.run(Thread.java:662)
>
>    Locked ownable synchronizers:
>         - locked <10cf5246> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>
>
> then thiis thread appears to be locking the resource in question...659ec0a4
>
>
> "Camel (camel-1) thread #13 - JmsConsumer[MyQueue]" - Thread t@94
>    java.lang.Thread.State: WAITING
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for <1ec8dc2f> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>         at
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:93)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>         at
>
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>         at
>
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1290)
>         at
>
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1284)
>         at
>
> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1898)
>         at
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2064)
>         at
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2059)
>         at
>
> org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:921)
>         - locked <119a6cfe> (a java.util.LinkedList)
>         at
>
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:602)
>         at
>
> org.apache.activemq.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67)
>         at
>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
>         at
>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>         at
>
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>         at
>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
>         at
>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
>         at
>
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>         at java.lang.Thread.run(Thread.java:662)
>
>    Locked ownable synchronizers:
>         - locked <659ec0a4> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>
>         - locked <3169a0b9> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>
>
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Re: consumer threads waiting on VMTransport locks...

Posted by "damiano.fontana" <da...@epocaricerca.it>.
I have experienced the same issue in my production environment.
ActiveMQ 5.7 production application (using VM transport and AMQ connection
pool).

Did you find a workaround to avoid this issue?




--
View this message in context: http://activemq.2283324.n4.nabble.com/consumer-threads-waiting-on-VMTransport-locks-tp4669580p4674802.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.