You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by tvijlbrief <to...@xs4all.nl> on 2007/02/17 18:33:47 UTC

Re: Blocking on UsageManager.waitForSpace again

I created a junit test which hangs after about 50 iterations.

Note that the behaviour is not compatible with two standalone programs and a
tcp-broker. 
In that case the producer can still send but the consumer blocks...

The unit test is attached to:

https://issues.apache.org/activemq/browse/AMQ-1136

Hope this helps.

Tom


rajdavies wrote:
> 
> Hi Albert,
> 
> it would be great if you could produce a junit test case and attach  
> it to an issue in our issue tracker - http://issues.apache.org/ 
> activemq/secure/CreateIssue!default.jspa
> these are just the sort of cases we look for for testing n' fixing  
> stuff!
> 
> cheers,
> 
> Rob
> On 21 Jan 2007, at 15:32, Albert Strasheim wrote:
> 
>> Hello all
>>
>> On Thu, 18 Jan 2007, James.Strachan wrote:
>>
>>> Have you tried setting the UsageManager configuration to something  
>>> large? Out
>>> of the box its set to something really tiny.
>>>
>>> BTW if you have an issue, please mention the version of the  
>>> software you are
>>> using.
>>> http://incubator.apache.org/activemq/support.html
>>> James
>>
>> Some more details from the day's debugging.
>>
>> We're running ActiveMQ 4.2 from trunk as checked out on Friday.
>>
>> Our test has a thread that produces messages. On another connection in
>> the same application, we have a consumer with a message listener set.
>> This listener produces a new message.
>>
>> Debugging with Eclipse, I see that the first time the code goes into
>> UsageManager.waitForSpace, it is calling from the thread that  
>> produces.
>> The stack looks like this:
>>
>> Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in  
>> UsageManager))	
>> 	UsageManager.waitForSpace() line: 91	
>> 	UsageManager.waitForSpace() line: 88	
>> 	Topic.send(ConnectionContext, Message) line: 248	
>> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,  
>> Message) line: 305	
>> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)  
>> line: 381	
>> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
>> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)  
>> line: 126	
>> 	CompositeDestinationBroker.send(ConnectionContext, Message) line: 98	
>> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,  
>> Message) line: 136	
>> 	TransportConnection.processMessage(Message) line: 449	
>> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:  
>> 604	
>> 	TransportConnection.service(Command) line: 258	
>> 	TransportConnection$1.onCommand(Object) line: 164	
>> 	ResponseCorrelator.onCommand(Object) line: 95	
>> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
>> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
>> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
>> 	MutexTransport.oneway(Object) line: 44	
>> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
>> 	ResponseCorrelator.request(Object) line: 74	
>> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
>> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,  
>> Message, int, int, long) line: 1547	
>> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)  
>> line: 473	
>> 	ActiveMQMessageProducer.send(Message) line: 358	
>>         <thread in my application calling send>
>>
>> If I then allow the VM to continue, it runs for a short while and then
>> the single remaining ActiveMQ Session Task thread blocks:
>>
>> Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in  
>> UsageManager))	
>> 	UsageManager.waitForSpace() line: 91	
>> 	UsageManager.waitForSpace() line: 88	
>> 	Topic.send(ConnectionContext, Message) line: 248	
>> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,  
>> Message) line: 305	
>> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)  
>> line: 381	
>> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
>> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)  
>> line: 126	
>> 	CompositeDestinationBroker.send(ConnectionContext, Message) line: 98	
>> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,  
>> Message) line: 136	
>> 	TransportConnection.processMessage(Message) line: 449	
>> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:  
>> 604	
>> 	TransportConnection.service(Command) line: 258	
>> 	TransportConnection$1.onCommand(Object) line: 164	
>> 	ResponseCorrelator.onCommand(Object) line: 95	
>> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
>> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
>> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
>> 	MutexTransport.oneway(Object) line: 44	
>> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
>> 	ResponseCorrelator.request(Object) line: 74	
>> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
>> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,  
>> Message, int, int, long) line: 1547	
>> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)  
>> line: 473	
>> 	ActiveMQMessageProducer.send(Message) line: 358	
>> 	
>> 	<onMessage of listener in my application is called. it calls send.>
>> 	
>> 	ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870	
>> 	ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99	
>> 	ActiveMQSessionExecutor.iterate() line: 166	
>> 	PooledTaskRunner.runTask() line: 111	
>> 	PooledTaskRunner.access$1(PooledTaskRunner) line: 95	
>> 	PooledTaskRunner$1.run() line: 44	
>> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 885	
>> 	ThreadPoolExecutor$Worker.run() line: 907	
>> 	Thread.run() line: 619	
>>
>> There doesn't seem to be any other ActiveMQ threads around, so I can't
>> see how this thread is going to wake up from its
>>
>> usageMutex.wait();
>>
>> that is blocking on.
>>
>> Any thoughts? Is it wrong to produce straight from a message listener?
>>
>> By the way, I noticed that increasing the memory available to the
>> memory manager from 20 MB to 128 MB actually makes the application  
>> block
>> sooner rather than later. I also experimented with some prefetch  
>> policy
>> settings, but these don't make much of a difference. Currently, we're
>> simply using
>>
>> vm://localhost?broker.useJmx=true&broker.persistent=false
>>
>> as our broker URL.
>>
>> We're using topics throughout. Sessions' acknowledge modes are all set
>> to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string
>> and integer properties.
>>
>> Cheers,
>>
>> Albert
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/Blocking-on-UsageManager.waitForSpace-again-tf3031460s2354.html#a9021999
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Blocking on UsageManager.waitForSpace again

Posted by Rob Davies <ra...@gmail.com>.
Ah - ok  that's useful - I'll see if I can reproduce

cheers,

Rob

On 17 Feb 2007, at 18:48, Tom wrote:

> Op zaterdag 17 februari 2007 19:07, schreef Rob Davies:
>> thank you!!
>>
>
> Rob,
>
> I figured out that the SlowReceive1 junit test probably deadlocks
> because the connection is shared between sender and consumer.
>
> So I rewrote the test and now the test runs to completion.
>
> Halfway during the send the Producer slows down and gets
> in sync with each received message.
>
> When I run my test programs with the tcp broker I still have the
> problem. The sender runs to completion, but the receiver blocks... :-(
>
> you can reproduce it by running my jmstransfer program:
>
> http://www.v7f.eu/public/jms/
>
> If you transfer a file of a few megabytes then often the receiver  
> blocks:
>
> Scenario: Send the file:
>
> jmsstream --topic topic/BigDurable --dest demo --timeout 0 /vmlinuz
>
> Start the receiver:
>
> jmsstream --topic topic/BigDurable --ident MyName --dest
> demo --prefix ./ --verbose
>
> New dest: .//vmlinuz
> Request: 1
> Request: 2
> Request: 3
>
> it just hangs with 4 messages left in the queue...


Re: Blocking on UsageManager.waitForSpace again

Posted by Tom <to...@xs4all.nl>.
Op zaterdag 17 februari 2007 19:07, schreef Rob Davies:
> thank you!!
>

Rob, 

I figured out that the SlowReceive1 junit test probably deadlocks
because the connection is shared between sender and consumer.

So I rewrote the test and now the test runs to completion.

Halfway during the send the Producer slows down and gets
in sync with each received message.

When I run my test programs with the tcp broker I still have the
problem. The sender runs to completion, but the receiver blocks... :-(

you can reproduce it by running my jmstransfer program:

http://www.v7f.eu/public/jms/

If you transfer a file of a few megabytes then often the receiver blocks:

Scenario: Send the file:

jmsstream --topic topic/BigDurable --dest demo --timeout 0 /vmlinuz

Start the receiver:

jmsstream --topic topic/BigDurable --ident MyName --dest 
demo --prefix ./ --verbose

New dest: .//vmlinuz
Request: 1
Request: 2
Request: 3

it just hangs with 4 messages left in the queue...

Re: Blocking on UsageManager.waitForSpace again

Posted by Rob Davies <ra...@gmail.com>.
thank you!!

On 17 Feb 2007, at 17:33, tvijlbrief wrote:

>
> I created a junit test which hangs after about 50 iterations.
>
> Note that the behaviour is not compatible with two standalone  
> programs and a
> tcp-broker.
> In that case the producer can still send but the consumer blocks...
>
> The unit test is attached to:
>
> https://issues.apache.org/activemq/browse/AMQ-1136
>
> Hope this helps.
>
> Tom
>
>
> rajdavies wrote:
>>
>> Hi Albert,
>>
>> it would be great if you could produce a junit test case and attach
>> it to an issue in our issue tracker - http://issues.apache.org/
>> activemq/secure/CreateIssue!default.jspa
>> these are just the sort of cases we look for for testing n' fixing
>> stuff!
>>
>> cheers,
>>
>> Rob
>> On 21 Jan 2007, at 15:32, Albert Strasheim wrote:
>>
>>> Hello all
>>>
>>> On Thu, 18 Jan 2007, James.Strachan wrote:
>>>
>>>> Have you tried setting the UsageManager configuration to something
>>>> large? Out
>>>> of the box its set to something really tiny.
>>>>
>>>> BTW if you have an issue, please mention the version of the
>>>> software you are
>>>> using.
>>>> http://incubator.apache.org/activemq/support.html
>>>> James
>>>
>>> Some more details from the day's debugging.
>>>
>>> We're running ActiveMQ 4.2 from trunk as checked out on Friday.
>>>
>>> Our test has a thread that produces messages. On another  
>>> connection in
>>> the same application, we have a consumer with a message listener  
>>> set.
>>> This listener produces a new message.
>>>
>>> Debugging with Eclipse, I see that the first time the code goes into
>>> UsageManager.waitForSpace, it is calling from the thread that
>>> produces.
>>> The stack looks like this:
>>>
>>> Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in
>>> UsageManager))	
>>> 	UsageManager.waitForSpace() line: 91	
>>> 	UsageManager.waitForSpace() line: 88	
>>> 	Topic.send(ConnectionContext, Message) line: 248	
>>> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,
>>> Message) line: 305	
>>> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)
>>> line: 381	
>>> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
>>> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)
>>> line: 126	
>>> 	CompositeDestinationBroker.send(ConnectionContext, Message)  
>>> line: 98	
>>> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,
>>> Message) line: 136	
>>> 	TransportConnection.processMessage(Message) line: 449	
>>> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:
>>> 604	
>>> 	TransportConnection.service(Command) line: 258	
>>> 	TransportConnection$1.onCommand(Object) line: 164	
>>> 	ResponseCorrelator.onCommand(Object) line: 95	
>>> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
>>> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
>>> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
>>> 	MutexTransport.oneway(Object) line: 44	
>>> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
>>> 	ResponseCorrelator.request(Object) line: 74	
>>> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
>>> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,
>>> Message, int, int, long) line: 1547	
>>> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)
>>> line: 473	
>>> 	ActiveMQMessageProducer.send(Message) line: 358	
>>>         <thread in my application calling send>
>>>
>>> If I then allow the VM to continue, it runs for a short while and  
>>> then
>>> the single remaining ActiveMQ Session Task thread blocks:
>>>
>>> Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in
>>> UsageManager))	
>>> 	UsageManager.waitForSpace() line: 91	
>>> 	UsageManager.waitForSpace() line: 88	
>>> 	Topic.send(ConnectionContext, Message) line: 248	
>>> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,
>>> Message) line: 305	
>>> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)
>>> line: 381	
>>> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
>>> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)
>>> line: 126	
>>> 	CompositeDestinationBroker.send(ConnectionContext, Message)  
>>> line: 98	
>>> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,
>>> Message) line: 136	
>>> 	TransportConnection.processMessage(Message) line: 449	
>>> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:
>>> 604	
>>> 	TransportConnection.service(Command) line: 258	
>>> 	TransportConnection$1.onCommand(Object) line: 164	
>>> 	ResponseCorrelator.onCommand(Object) line: 95	
>>> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
>>> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
>>> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
>>> 	MutexTransport.oneway(Object) line: 44	
>>> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
>>> 	ResponseCorrelator.request(Object) line: 74	
>>> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
>>> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,
>>> Message, int, int, long) line: 1547	
>>> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)
>>> line: 473	
>>> 	ActiveMQMessageProducer.send(Message) line: 358	
>>> 	
>>> 	<onMessage of listener in my application is called. it calls send.>
>>> 	
>>> 	ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870	
>>> 	ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99	
>>> 	ActiveMQSessionExecutor.iterate() line: 166	
>>> 	PooledTaskRunner.runTask() line: 111	
>>> 	PooledTaskRunner.access$1(PooledTaskRunner) line: 95	
>>> 	PooledTaskRunner$1.run() line: 44	
>>> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 885	
>>> 	ThreadPoolExecutor$Worker.run() line: 907	
>>> 	Thread.run() line: 619	
>>>
>>> There doesn't seem to be any other ActiveMQ threads around, so I  
>>> can't
>>> see how this thread is going to wake up from its
>>>
>>> usageMutex.wait();
>>>
>>> that is blocking on.
>>>
>>> Any thoughts? Is it wrong to produce straight from a message  
>>> listener?
>>>
>>> By the way, I noticed that increasing the memory available to the
>>> memory manager from 20 MB to 128 MB actually makes the application
>>> block
>>> sooner rather than later. I also experimented with some prefetch
>>> policy
>>> settings, but these don't make much of a difference. Currently,  
>>> we're
>>> simply using
>>>
>>> vm://localhost?broker.useJmx=true&broker.persistent=false
>>>
>>> as our broker URL.
>>>
>>> We're using topics throughout. Sessions' acknowledge modes are  
>>> all set
>>> to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with  
>>> string
>>> and integer properties.
>>>
>>> Cheers,
>>>
>>> Albert
>>
>>
>>
>
> -- 
> View this message in context: http://www.nabble.com/Blocking-on- 
> UsageManager.waitForSpace-again-tf3031460s2354.html#a9021999
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>