You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Albert Strasheim <fu...@gmail.com> on 2007/01/18 01:26:16 UTC

Blocking on UsageManager.waitForSpace again

Hello all

We're having a problem where our application is blocking on 
UsageManager.waitForSpace for no obvious reason. The symptoms are very 
similar to what was discussed in this thread:

http://www.mail-archive.com/activemq-users@geronimo.apache.org/msg06288.html

We set up few sessions with producers and consumers that send two types of 
messages, with all message sizes in the low kB range. We're using 
non-persistent, non-durable topics to communicate.

The application runs fine for about 2 minutes and during this time 
MemoryPercentageUsed (checked with JConsole) remains below 10% the whole 
time until it suddenly goes up to 100 and the application stops. Our 
application isn't doing any different at the point this abrupt increase 
occurs. It produces and consumes the same messages as fast as the CPU allows 
the whole time until it blocks.

Running with the JDK 1.6.0 server VM without any extra arguments, according 
to JConsole, the maximum heap size is 517 MB. It seems like the hang occurs 
each time exactly when the used heap size goes over 200 MB. Running 
ith  -Xms384m -Xmx512m I see the used heap size go up to 300+ MB then come 
back to down to a little more than 200 MB and then when it reaches 260 MB, 
it hangs again.

Running the client VM, the hang happens when the used heap size goes over 
300 MB, each time.

I'm using Windows XP SP2 with a Core 2 Duo processor.

Any thoughts?

Cheers,

Albert Strasheim

P. S. JConsole doesn't report any deadlocks in ActiveMQ or our application, 
so it looks like it really is blocking on UsageManager.waitForSpace.



Re: Blocking on UsageManager.waitForSpace again

Posted by Albert Strasheim <13...@sun.ac.za>.
Rob,

On Mon, 22 Jan 2007, Rob Davies wrote:

> Hi Albert,
> 
> it would be great if you could produce a junit test case and attach  
> it to an issue in our issue tracker - http://issues.apache.org/ 
> activemq/secure/CreateIssue!default.jspa
> these are just the sort of cases we look for for testing n' fixing  
> stuff!

I've tried to reproduce this problem outside of our code in a test, but 
so far I've had no luck. I'll try again later today.

Cheers,

Albert

Re: Blocking on UsageManager.waitForSpace again

Posted by Rob Davies <ra...@gmail.com>.
Ah - ok  that's useful - I'll see if I can reproduce

cheers,

Rob

On 17 Feb 2007, at 18:48, Tom wrote:

> Op zaterdag 17 februari 2007 19:07, schreef Rob Davies:
>> thank you!!
>>
>
> Rob,
>
> I figured out that the SlowReceive1 junit test probably deadlocks
> because the connection is shared between sender and consumer.
>
> So I rewrote the test and now the test runs to completion.
>
> Halfway during the send the Producer slows down and gets
> in sync with each received message.
>
> When I run my test programs with the tcp broker I still have the
> problem. The sender runs to completion, but the receiver blocks... :-(
>
> you can reproduce it by running my jmstransfer program:
>
> http://www.v7f.eu/public/jms/
>
> If you transfer a file of a few megabytes then often the receiver  
> blocks:
>
> Scenario: Send the file:
>
> jmsstream --topic topic/BigDurable --dest demo --timeout 0 /vmlinuz
>
> Start the receiver:
>
> jmsstream --topic topic/BigDurable --ident MyName --dest
> demo --prefix ./ --verbose
>
> New dest: .//vmlinuz
> Request: 1
> Request: 2
> Request: 3
>
> it just hangs with 4 messages left in the queue...


Re: Blocking on UsageManager.waitForSpace again

Posted by Tom <to...@xs4all.nl>.
Op zaterdag 17 februari 2007 19:07, schreef Rob Davies:
> thank you!!
>

Rob, 

I figured out that the SlowReceive1 junit test probably deadlocks
because the connection is shared between sender and consumer.

So I rewrote the test and now the test runs to completion.

Halfway during the send the Producer slows down and gets
in sync with each received message.

When I run my test programs with the tcp broker I still have the
problem. The sender runs to completion, but the receiver blocks... :-(

you can reproduce it by running my jmstransfer program:

http://www.v7f.eu/public/jms/

If you transfer a file of a few megabytes then often the receiver blocks:

Scenario: Send the file:

jmsstream --topic topic/BigDurable --dest demo --timeout 0 /vmlinuz

Start the receiver:

jmsstream --topic topic/BigDurable --ident MyName --dest 
demo --prefix ./ --verbose

New dest: .//vmlinuz
Request: 1
Request: 2
Request: 3

it just hangs with 4 messages left in the queue...

Re: Blocking on UsageManager.waitForSpace again

Posted by Rob Davies <ra...@gmail.com>.
thank you!!

On 17 Feb 2007, at 17:33, tvijlbrief wrote:

>
> I created a junit test which hangs after about 50 iterations.
>
> Note that the behaviour is not compatible with two standalone  
> programs and a
> tcp-broker.
> In that case the producer can still send but the consumer blocks...
>
> The unit test is attached to:
>
> https://issues.apache.org/activemq/browse/AMQ-1136
>
> Hope this helps.
>
> Tom
>
>
> rajdavies wrote:
>>
>> Hi Albert,
>>
>> it would be great if you could produce a junit test case and attach
>> it to an issue in our issue tracker - http://issues.apache.org/
>> activemq/secure/CreateIssue!default.jspa
>> these are just the sort of cases we look for for testing n' fixing
>> stuff!
>>
>> cheers,
>>
>> Rob
>> On 21 Jan 2007, at 15:32, Albert Strasheim wrote:
>>
>>> Hello all
>>>
>>> On Thu, 18 Jan 2007, James.Strachan wrote:
>>>
>>>> Have you tried setting the UsageManager configuration to something
>>>> large? Out
>>>> of the box its set to something really tiny.
>>>>
>>>> BTW if you have an issue, please mention the version of the
>>>> software you are
>>>> using.
>>>> http://incubator.apache.org/activemq/support.html
>>>> James
>>>
>>> Some more details from the day's debugging.
>>>
>>> We're running ActiveMQ 4.2 from trunk as checked out on Friday.
>>>
>>> Our test has a thread that produces messages. On another  
>>> connection in
>>> the same application, we have a consumer with a message listener  
>>> set.
>>> This listener produces a new message.
>>>
>>> Debugging with Eclipse, I see that the first time the code goes into
>>> UsageManager.waitForSpace, it is calling from the thread that
>>> produces.
>>> The stack looks like this:
>>>
>>> Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in
>>> UsageManager))	
>>> 	UsageManager.waitForSpace() line: 91	
>>> 	UsageManager.waitForSpace() line: 88	
>>> 	Topic.send(ConnectionContext, Message) line: 248	
>>> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,
>>> Message) line: 305	
>>> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)
>>> line: 381	
>>> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
>>> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)
>>> line: 126	
>>> 	CompositeDestinationBroker.send(ConnectionContext, Message)  
>>> line: 98	
>>> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,
>>> Message) line: 136	
>>> 	TransportConnection.processMessage(Message) line: 449	
>>> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:
>>> 604	
>>> 	TransportConnection.service(Command) line: 258	
>>> 	TransportConnection$1.onCommand(Object) line: 164	
>>> 	ResponseCorrelator.onCommand(Object) line: 95	
>>> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
>>> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
>>> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
>>> 	MutexTransport.oneway(Object) line: 44	
>>> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
>>> 	ResponseCorrelator.request(Object) line: 74	
>>> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
>>> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,
>>> Message, int, int, long) line: 1547	
>>> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)
>>> line: 473	
>>> 	ActiveMQMessageProducer.send(Message) line: 358	
>>>         <thread in my application calling send>
>>>
>>> If I then allow the VM to continue, it runs for a short while and  
>>> then
>>> the single remaining ActiveMQ Session Task thread blocks:
>>>
>>> Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in
>>> UsageManager))	
>>> 	UsageManager.waitForSpace() line: 91	
>>> 	UsageManager.waitForSpace() line: 88	
>>> 	Topic.send(ConnectionContext, Message) line: 248	
>>> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,
>>> Message) line: 305	
>>> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)
>>> line: 381	
>>> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
>>> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)
>>> line: 126	
>>> 	CompositeDestinationBroker.send(ConnectionContext, Message)  
>>> line: 98	
>>> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,
>>> Message) line: 136	
>>> 	TransportConnection.processMessage(Message) line: 449	
>>> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:
>>> 604	
>>> 	TransportConnection.service(Command) line: 258	
>>> 	TransportConnection$1.onCommand(Object) line: 164	
>>> 	ResponseCorrelator.onCommand(Object) line: 95	
>>> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
>>> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
>>> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
>>> 	MutexTransport.oneway(Object) line: 44	
>>> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
>>> 	ResponseCorrelator.request(Object) line: 74	
>>> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
>>> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,
>>> Message, int, int, long) line: 1547	
>>> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)
>>> line: 473	
>>> 	ActiveMQMessageProducer.send(Message) line: 358	
>>> 	
>>> 	<onMessage of listener in my application is called. it calls send.>
>>> 	
>>> 	ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870	
>>> 	ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99	
>>> 	ActiveMQSessionExecutor.iterate() line: 166	
>>> 	PooledTaskRunner.runTask() line: 111	
>>> 	PooledTaskRunner.access$1(PooledTaskRunner) line: 95	
>>> 	PooledTaskRunner$1.run() line: 44	
>>> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 885	
>>> 	ThreadPoolExecutor$Worker.run() line: 907	
>>> 	Thread.run() line: 619	
>>>
>>> There doesn't seem to be any other ActiveMQ threads around, so I  
>>> can't
>>> see how this thread is going to wake up from its
>>>
>>> usageMutex.wait();
>>>
>>> that is blocking on.
>>>
>>> Any thoughts? Is it wrong to produce straight from a message  
>>> listener?
>>>
>>> By the way, I noticed that increasing the memory available to the
>>> memory manager from 20 MB to 128 MB actually makes the application
>>> block
>>> sooner rather than later. I also experimented with some prefetch
>>> policy
>>> settings, but these don't make much of a difference. Currently,  
>>> we're
>>> simply using
>>>
>>> vm://localhost?broker.useJmx=true&broker.persistent=false
>>>
>>> as our broker URL.
>>>
>>> We're using topics throughout. Sessions' acknowledge modes are  
>>> all set
>>> to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with  
>>> string
>>> and integer properties.
>>>
>>> Cheers,
>>>
>>> Albert
>>
>>
>>
>
> -- 
> View this message in context: http://www.nabble.com/Blocking-on- 
> UsageManager.waitForSpace-again-tf3031460s2354.html#a9021999
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>


Re: Blocking on UsageManager.waitForSpace again

Posted by tvijlbrief <to...@xs4all.nl>.
I created a junit test which hangs after about 50 iterations.

Note that the behaviour is not compatible with two standalone programs and a
tcp-broker. 
In that case the producer can still send but the consumer blocks...

The unit test is attached to:

https://issues.apache.org/activemq/browse/AMQ-1136

Hope this helps.

Tom


rajdavies wrote:
> 
> Hi Albert,
> 
> it would be great if you could produce a junit test case and attach  
> it to an issue in our issue tracker - http://issues.apache.org/ 
> activemq/secure/CreateIssue!default.jspa
> these are just the sort of cases we look for for testing n' fixing  
> stuff!
> 
> cheers,
> 
> Rob
> On 21 Jan 2007, at 15:32, Albert Strasheim wrote:
> 
>> Hello all
>>
>> On Thu, 18 Jan 2007, James.Strachan wrote:
>>
>>> Have you tried setting the UsageManager configuration to something  
>>> large? Out
>>> of the box its set to something really tiny.
>>>
>>> BTW if you have an issue, please mention the version of the  
>>> software you are
>>> using.
>>> http://incubator.apache.org/activemq/support.html
>>> James
>>
>> Some more details from the day's debugging.
>>
>> We're running ActiveMQ 4.2 from trunk as checked out on Friday.
>>
>> Our test has a thread that produces messages. On another connection in
>> the same application, we have a consumer with a message listener set.
>> This listener produces a new message.
>>
>> Debugging with Eclipse, I see that the first time the code goes into
>> UsageManager.waitForSpace, it is calling from the thread that  
>> produces.
>> The stack looks like this:
>>
>> Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in  
>> UsageManager))	
>> 	UsageManager.waitForSpace() line: 91	
>> 	UsageManager.waitForSpace() line: 88	
>> 	Topic.send(ConnectionContext, Message) line: 248	
>> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,  
>> Message) line: 305	
>> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)  
>> line: 381	
>> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
>> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)  
>> line: 126	
>> 	CompositeDestinationBroker.send(ConnectionContext, Message) line: 98	
>> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,  
>> Message) line: 136	
>> 	TransportConnection.processMessage(Message) line: 449	
>> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:  
>> 604	
>> 	TransportConnection.service(Command) line: 258	
>> 	TransportConnection$1.onCommand(Object) line: 164	
>> 	ResponseCorrelator.onCommand(Object) line: 95	
>> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
>> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
>> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
>> 	MutexTransport.oneway(Object) line: 44	
>> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
>> 	ResponseCorrelator.request(Object) line: 74	
>> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
>> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,  
>> Message, int, int, long) line: 1547	
>> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)  
>> line: 473	
>> 	ActiveMQMessageProducer.send(Message) line: 358	
>>         <thread in my application calling send>
>>
>> If I then allow the VM to continue, it runs for a short while and then
>> the single remaining ActiveMQ Session Task thread blocks:
>>
>> Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in  
>> UsageManager))	
>> 	UsageManager.waitForSpace() line: 91	
>> 	UsageManager.waitForSpace() line: 88	
>> 	Topic.send(ConnectionContext, Message) line: 248	
>> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,  
>> Message) line: 305	
>> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)  
>> line: 381	
>> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
>> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)  
>> line: 126	
>> 	CompositeDestinationBroker.send(ConnectionContext, Message) line: 98	
>> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,  
>> Message) line: 136	
>> 	TransportConnection.processMessage(Message) line: 449	
>> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:  
>> 604	
>> 	TransportConnection.service(Command) line: 258	
>> 	TransportConnection$1.onCommand(Object) line: 164	
>> 	ResponseCorrelator.onCommand(Object) line: 95	
>> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
>> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
>> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
>> 	MutexTransport.oneway(Object) line: 44	
>> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
>> 	ResponseCorrelator.request(Object) line: 74	
>> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
>> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,  
>> Message, int, int, long) line: 1547	
>> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)  
>> line: 473	
>> 	ActiveMQMessageProducer.send(Message) line: 358	
>> 	
>> 	<onMessage of listener in my application is called. it calls send.>
>> 	
>> 	ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870	
>> 	ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99	
>> 	ActiveMQSessionExecutor.iterate() line: 166	
>> 	PooledTaskRunner.runTask() line: 111	
>> 	PooledTaskRunner.access$1(PooledTaskRunner) line: 95	
>> 	PooledTaskRunner$1.run() line: 44	
>> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 885	
>> 	ThreadPoolExecutor$Worker.run() line: 907	
>> 	Thread.run() line: 619	
>>
>> There doesn't seem to be any other ActiveMQ threads around, so I can't
>> see how this thread is going to wake up from its
>>
>> usageMutex.wait();
>>
>> that is blocking on.
>>
>> Any thoughts? Is it wrong to produce straight from a message listener?
>>
>> By the way, I noticed that increasing the memory available to the
>> memory manager from 20 MB to 128 MB actually makes the application  
>> block
>> sooner rather than later. I also experimented with some prefetch  
>> policy
>> settings, but these don't make much of a difference. Currently, we're
>> simply using
>>
>> vm://localhost?broker.useJmx=true&broker.persistent=false
>>
>> as our broker URL.
>>
>> We're using topics throughout. Sessions' acknowledge modes are all set
>> to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string
>> and integer properties.
>>
>> Cheers,
>>
>> Albert
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/Blocking-on-UsageManager.waitForSpace-again-tf3031460s2354.html#a9021999
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Blocking on UsageManager.waitForSpace again

Posted by Rob Davies <ra...@gmail.com>.
Hi Albert,

it would be great if you could produce a junit test case and attach  
it to an issue in our issue tracker - http://issues.apache.org/ 
activemq/secure/CreateIssue!default.jspa
these are just the sort of cases we look for for testing n' fixing  
stuff!

cheers,

Rob
On 21 Jan 2007, at 15:32, Albert Strasheim wrote:

> Hello all
>
> On Thu, 18 Jan 2007, James.Strachan wrote:
>
>> Have you tried setting the UsageManager configuration to something  
>> large? Out
>> of the box its set to something really tiny.
>>
>> BTW if you have an issue, please mention the version of the  
>> software you are
>> using.
>> http://incubator.apache.org/activemq/support.html
>> James
>
> Some more details from the day's debugging.
>
> We're running ActiveMQ 4.2 from trunk as checked out on Friday.
>
> Our test has a thread that produces messages. On another connection in
> the same application, we have a consumer with a message listener set.
> This listener produces a new message.
>
> Debugging with Eclipse, I see that the first time the code goes into
> UsageManager.waitForSpace, it is calling from the thread that  
> produces.
> The stack looks like this:
>
> Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in  
> UsageManager))	
> 	UsageManager.waitForSpace() line: 91	
> 	UsageManager.waitForSpace() line: 88	
> 	Topic.send(ConnectionContext, Message) line: 248	
> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,  
> Message) line: 305	
> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)  
> line: 381	
> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)  
> line: 126	
> 	CompositeDestinationBroker.send(ConnectionContext, Message) line: 98	
> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,  
> Message) line: 136	
> 	TransportConnection.processMessage(Message) line: 449	
> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:  
> 604	
> 	TransportConnection.service(Command) line: 258	
> 	TransportConnection$1.onCommand(Object) line: 164	
> 	ResponseCorrelator.onCommand(Object) line: 95	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
> 	MutexTransport.oneway(Object) line: 44	
> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
> 	ResponseCorrelator.request(Object) line: 74	
> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,  
> Message, int, int, long) line: 1547	
> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)  
> line: 473	
> 	ActiveMQMessageProducer.send(Message) line: 358	
>         <thread in my application calling send>
>
> If I then allow the VM to continue, it runs for a short while and then
> the single remaining ActiveMQ Session Task thread blocks:
>
> Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in  
> UsageManager))	
> 	UsageManager.waitForSpace() line: 91	
> 	UsageManager.waitForSpace() line: 88	
> 	Topic.send(ConnectionContext, Message) line: 248	
> 	ManagedTopicRegion(AbstractRegion).send(ConnectionContext,  
> Message) line: 305	
> 	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message)  
> line: 381	
> 	TransactionBroker.send(ConnectionContext, Message) line: 197	
> 	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message)  
> line: 126	
> 	CompositeDestinationBroker.send(ConnectionContext, Message) line: 98	
> 	BrokerService$2(MutableBrokerFilter).send(ConnectionContext,  
> Message) line: 136	
> 	TransportConnection.processMessage(Message) line: 449	
> 	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line:  
> 604	
> 	TransportConnection.service(Command) line: 258	
> 	TransportConnection$1.onCommand(Object) line: 164	
> 	ResponseCorrelator.onCommand(Object) line: 95	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 65	
> 	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
> 	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
> 	MutexTransport.oneway(Object) line: 44	
> 	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
> 	ResponseCorrelator.request(Object) line: 74	
> 	ActiveMQConnection.syncSendPacket(Command) line: 1185	
> 	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination,  
> Message, int, int, long) line: 1547	
> 	ActiveMQMessageProducer.send(Destination, Message, int, int, long)  
> line: 473	
> 	ActiveMQMessageProducer.send(Message) line: 358	
> 	
> 	<onMessage of listener in my application is called. it calls send.>
> 	
> 	ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870	
> 	ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99	
> 	ActiveMQSessionExecutor.iterate() line: 166	
> 	PooledTaskRunner.runTask() line: 111	
> 	PooledTaskRunner.access$1(PooledTaskRunner) line: 95	
> 	PooledTaskRunner$1.run() line: 44	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 885	
> 	ThreadPoolExecutor$Worker.run() line: 907	
> 	Thread.run() line: 619	
>
> There doesn't seem to be any other ActiveMQ threads around, so I can't
> see how this thread is going to wake up from its
>
> usageMutex.wait();
>
> that is blocking on.
>
> Any thoughts? Is it wrong to produce straight from a message listener?
>
> By the way, I noticed that increasing the memory available to the
> memory manager from 20 MB to 128 MB actually makes the application  
> block
> sooner rather than later. I also experimented with some prefetch  
> policy
> settings, but these don't make much of a difference. Currently, we're
> simply using
>
> vm://localhost?broker.useJmx=true&broker.persistent=false
>
> as our broker URL.
>
> We're using topics throughout. Sessions' acknowledge modes are all set
> to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string
> and integer properties.
>
> Cheers,
>
> Albert


Re: Blocking on UsageManager.waitForSpace again

Posted by Albert Strasheim <fu...@gmail.com>.
Hello all,

On Mon, 22 Jan 2007, James Strachan wrote:

> FWIW I"d definitely recommend you use non-persistent sending on your
> producers (otherwise the spool to disk won't kick in). Also are you
> actually ack'ing your messages?

Thanks, I'll try setting DeliveryMode.NON_PERSITENT on our producers.

> On 1/21/07, Albert Strasheim <13...@sun.ac.za> wrote:
> >We're using topics throughout. Sessions' acknowledge modes are all set
> >to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string
> >and integer properties.

Is setting our consumers to DUPS_OK_ACKNOWLEDGDE sufficient or should we 
be ack'ing messages some other way?

Cheers,

Albert

Re: Blocking on UsageManager.waitForSpace again

Posted by Rob Davies <ra...@gmail.com>.
Hi Albert,

a junit test case to reproduce this will really help speed up a  
resolution :)

cheers,

Rob
On 22 Jan 2007, at 20:10, Albert Strasheim wrote:

> Hello all
>
> On Mon, 22 Jan 2007, James Strachan wrote:
>
>> FWIW I"d definitely recommend you use non-persistent sending on your
>> producers (otherwise the spool to disk won't kick in). Also are you
>> actually ack'ing your messages?
>
> First off, we're using ActiveMQ 4.2 from trunk.
>
> I've taken a closer look at our application and I think I'm  
> beginning to
> understand what's going on.
>
> We have a non-persistent producer sending to a non-durable consumer
> through a topic. Sessions are set to auto-acknowledge We're seeing
> issues when running our test code, which uses an embedded broker,  
> but I
> think these issues will also crop up with a separate broker.
>
> The producer is sending relatively large messages (1--2 kB) as fast as
> the data they contain can be read from disk. The consumer  
> subscribed to
> the topic does a lot of processing on these messages (i.e. the
> consumer is slow) before sending a few smaller messages to another
> topic. What usually happens is that this send blocks, due to
> MemoryUsagePercent being 100. Since the consumer blocks while  
> trying to
> send the small message, it no longer consumes any of the large  
> messages
> and our application stops.
>
> The same thing happens even when the consumer simply consumes the  
> large
> message and puts it into a Java queue serviced by another thread. The
> rapidly produced big messages fill up the broker's memory, causing the
> thread to block when trying to send a small message after taking a big
> message from the queue.
>
> In our case, we definately don't want to drop messages if the consumer
> can't keep up. Instead, we want the the fast producer to block,  
> without
> using up all the broker's resources, preventing the slow consumers  
> from
> actually handling the messages they receive (which involves sending
> another message).
>
> I seem to recall that I read somewhere in the mailing list archives or
> in one of the JIRA issues about a per-destination buffer. It seems  
> like
> this would solve our problem: as long as there's a bit of space for  
> the
> slow consumers to send out their messages, they can keep going and the
> producer can send new big messages as space becomes available.
>
> Should this be happening already? Are we doing something wrong?
>
> Any comments appreciated.
>
> Cheers,
>
> Albert


Re: Blocking on UsageManager.waitForSpace again

Posted by Albert Strasheim <13...@sun.ac.za>.
Hello all

After hacking up most of our application, I came up with the following 
simple (I hope?) test case:

public final class SlowConsumerTest {
    public static void main(final String[] args) throws JMSException, InterruptedException, IOException {
        class Listener implements MessageListener {
            public void onMessage(final Message message) {
                System.out.println("GOT A MESSAGE BEING SLOW");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        final Topic topic = new ActiveMQTopic("topic");
        final String brokerURL = "vm://localhost?broker.useJmx=false&broker.persistent=false";
        Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session.createConsumer(topic).setMessageListener(new Listener());
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        connection.start();
        while (true) {
            BytesMessage message = new ActiveMQBytesMessage();
            try {
                message.writeBytes(new byte[2048]);
                producer.send(message);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Whether it hangs or not depends seems to depend on the message size and 
the VM's heap size arguments.

With 2048 byte messages with -Xms384m -Xmx512m it hangs reliably within 
about 7 messages received by the slow consumer on Windows XP SP2 with 
JDK 1.5.0_10 and 1.6.0 and on Linux with JDK 1.6.0.

In cases where it doesn't hang, one typically sees a "out of heap space" 
error after about 20 or 30 messages, but the messages keep going for 
some reason.

When it hangs, there are 2 or 3 threads running.

The main thread's stack:

Name: main
State: WAITING on java.lang.Object@eff545
Total blocked: 2  Total waited: 2

Stack trace: 
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:485)
org.apache.activemq.memory.UsageManager.waitForSpace(UsageManager.java:91)
org.apache.activemq.memory.UsageManager.waitForSpace(UsageManager.java:88)
org.apache.activemq.broker.region.Topic.send(Topic.java:248)
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:305)
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:381)
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:197)
org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:98)
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:136)
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:449)
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:604)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:258)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:164)
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:95)
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:65)
org.apache.activemq.transport.vm.VMTransport.syncOneWay(VMTransport.java:99)
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:86)
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
   - locked java.lang.Object@10cc9b4
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1165)
org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1545)
   - locked java.lang.Object@f3e133
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:473)
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:358)
SlowConsumerTest.main(SlowConsumerTest.java:41)

The ActiveMQ Session task's stack:

Name: ActiveMQ Session Task
State: BLOCKED on java.lang.Object@10cc9b4 owned by: main
Total blocked: 3  Total waited: 8

Stack trace: 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:43)
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1165)
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1647)
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:700)
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:871)
   - locked java.lang.Object@1079781
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:99)
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:166)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111)
org.apache.activemq.thread.PooledTaskRunner.access$1(PooledTaskRunner.java:95)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
java.lang.Thread.run(Thread.java:619)

The third thread is some kind of timer thread that disappears after a 
while.

We teested with the latest ActiveMQ 4.2 from trunk and 4.2 checked out 
a few weeks ago -- same problem in both cases.

We'd simply like our producer to block waiting for space if the 
consumers can't keep up -- this is what I expected this code to do (or 
am I mistaken?). Naturally, if the consumers have a few messages to 
send, we don't want them to block too, so some kind of per-destination 
buffer would help here.

Should we be doing something differently to achieve this? We don't want 
to spool to disk, since our data is already being read from a disk, so 
if the producer dies, it can just restart and resend the data.

Thanks for all the help so far.

Cheers,

Albert

On Tue, 23 Jan 2007, Rob Davies wrote:

> reproduced - will fix shortly
> On 23 Jan 2007, at 07:26, James Strachan wrote:
> 
> >I thought that the new spool-to-disk feature should kick in
> >automatically - obviously not :). Anyone know if that has to be
> >explicitly enabled?
> >
> >BTW are you explicitly disabling persistence on ActiveMQ? I think you
> >might need to keep it enabled for spooling to disk to work.

Re: Blocking on UsageManager.waitForSpace again

Posted by Rob Davies <ra...@gmail.com>.
reproduced - will fix shortly
On 23 Jan 2007, at 07:26, James Strachan wrote:

> I thought that the new spool-to-disk feature should kick in
> automatically - obviously not :). Anyone know if that has to be
> explicitly enabled?
>
> BTW are you explicitly disabling persistence on ActiveMQ? I think you
> might need to keep it enabled for spooling to disk to work.
>
>
> On 1/22/07, Albert Strasheim <13...@sun.ac.za> wrote:
>> Hello all
>>
>> On Mon, 22 Jan 2007, James Strachan wrote:
>>
>> > FWIW I"d definitely recommend you use non-persistent sending on  
>> your
>> > producers (otherwise the spool to disk won't kick in). Also are you
>> > actually ack'ing your messages?
>>
>> First off, we're using ActiveMQ 4.2 from trunk.
>>
>> I've taken a closer look at our application and I think I'm  
>> beginning to
>> understand what's going on.
>>
>> We have a non-persistent producer sending to a non-durable consumer
>> through a topic. Sessions are set to auto-acknowledge We're seeing
>> issues when running our test code, which uses an embedded broker,  
>> but I
>> think these issues will also crop up with a separate broker.
>>
>> The producer is sending relatively large messages (1--2 kB) as  
>> fast as
>> the data they contain can be read from disk. The consumer  
>> subscribed to
>> the topic does a lot of processing on these messages (i.e. the
>> consumer is slow) before sending a few smaller messages to another
>> topic. What usually happens is that this send blocks, due to
>> MemoryUsagePercent being 100. Since the consumer blocks while  
>> trying to
>> send the small message, it no longer consumes any of the large  
>> messages
>> and our application stops.
>>
>> The same thing happens even when the consumer simply consumes the  
>> large
>> message and puts it into a Java queue serviced by another thread. The
>> rapidly produced big messages fill up the broker's memory, causing  
>> the
>> thread to block when trying to send a small message after taking a  
>> big
>> message from the queue.
>>
>> In our case, we definately don't want to drop messages if the  
>> consumer
>> can't keep up. Instead, we want the the fast producer to block,  
>> without
>> using up all the broker's resources, preventing the slow consumers  
>> from
>> actually handling the messages they receive (which involves sending
>> another message).
>>
>> I seem to recall that I read somewhere in the mailing list  
>> archives or
>> in one of the JIRA issues about a per-destination buffer. It seems  
>> like
>> this would solve our problem: as long as there's a bit of space  
>> for the
>> slow consumers to send out their messages, they can keep going and  
>> the
>> producer can send new big messages as space becomes available.
>>
>> Should this be happening already? Are we doing something wrong?
>>
>> Any comments appreciated.
>>
>> Cheers,
>>
>> Albert
>>
>
>
> -- 
>
> James
> -------
> http://radio.weblogs.com/0112098/


Re: Blocking on UsageManager.waitForSpace again

Posted by Rob Davies <ra...@gmail.com>.
it should be spooling to disk - waiting for a test case :)
cheers,

Rob
On 23 Jan 2007, at 07:26, James Strachan wrote:

> I thought that the new spool-to-disk feature should kick in
> automatically - obviously not :). Anyone know if that has to be
> explicitly enabled?
>
> BTW are you explicitly disabling persistence on ActiveMQ? I think you
> might need to keep it enabled for spooling to disk to work.
>
>
> On 1/22/07, Albert Strasheim <13...@sun.ac.za> wrote:
>> Hello all
>>
>> On Mon, 22 Jan 2007, James Strachan wrote:
>>
>> > FWIW I"d definitely recommend you use non-persistent sending on  
>> your
>> > producers (otherwise the spool to disk won't kick in). Also are you
>> > actually ack'ing your messages?
>>
>> First off, we're using ActiveMQ 4.2 from trunk.
>>
>> I've taken a closer look at our application and I think I'm  
>> beginning to
>> understand what's going on.
>>
>> We have a non-persistent producer sending to a non-durable consumer
>> through a topic. Sessions are set to auto-acknowledge We're seeing
>> issues when running our test code, which uses an embedded broker,  
>> but I
>> think these issues will also crop up with a separate broker.
>>
>> The producer is sending relatively large messages (1--2 kB) as  
>> fast as
>> the data they contain can be read from disk. The consumer  
>> subscribed to
>> the topic does a lot of processing on these messages (i.e. the
>> consumer is slow) before sending a few smaller messages to another
>> topic. What usually happens is that this send blocks, due to
>> MemoryUsagePercent being 100. Since the consumer blocks while  
>> trying to
>> send the small message, it no longer consumes any of the large  
>> messages
>> and our application stops.
>>
>> The same thing happens even when the consumer simply consumes the  
>> large
>> message and puts it into a Java queue serviced by another thread. The
>> rapidly produced big messages fill up the broker's memory, causing  
>> the
>> thread to block when trying to send a small message after taking a  
>> big
>> message from the queue.
>>
>> In our case, we definately don't want to drop messages if the  
>> consumer
>> can't keep up. Instead, we want the the fast producer to block,  
>> without
>> using up all the broker's resources, preventing the slow consumers  
>> from
>> actually handling the messages they receive (which involves sending
>> another message).
>>
>> I seem to recall that I read somewhere in the mailing list  
>> archives or
>> in one of the JIRA issues about a per-destination buffer. It seems  
>> like
>> this would solve our problem: as long as there's a bit of space  
>> for the
>> slow consumers to send out their messages, they can keep going and  
>> the
>> producer can send new big messages as space becomes available.
>>
>> Should this be happening already? Are we doing something wrong?
>>
>> Any comments appreciated.
>>
>> Cheers,
>>
>> Albert
>>
>
>
> -- 
>
> James
> -------
> http://radio.weblogs.com/0112098/


Re: Blocking on UsageManager.waitForSpace again

Posted by James Strachan <ja...@gmail.com>.
I thought that the new spool-to-disk feature should kick in
automatically - obviously not :). Anyone know if that has to be
explicitly enabled?

BTW are you explicitly disabling persistence on ActiveMQ? I think you
might need to keep it enabled for spooling to disk to work.


On 1/22/07, Albert Strasheim <13...@sun.ac.za> wrote:
> Hello all
>
> On Mon, 22 Jan 2007, James Strachan wrote:
>
> > FWIW I"d definitely recommend you use non-persistent sending on your
> > producers (otherwise the spool to disk won't kick in). Also are you
> > actually ack'ing your messages?
>
> First off, we're using ActiveMQ 4.2 from trunk.
>
> I've taken a closer look at our application and I think I'm beginning to
> understand what's going on.
>
> We have a non-persistent producer sending to a non-durable consumer
> through a topic. Sessions are set to auto-acknowledge We're seeing
> issues when running our test code, which uses an embedded broker, but I
> think these issues will also crop up with a separate broker.
>
> The producer is sending relatively large messages (1--2 kB) as fast as
> the data they contain can be read from disk. The consumer subscribed to
> the topic does a lot of processing on these messages (i.e. the
> consumer is slow) before sending a few smaller messages to another
> topic. What usually happens is that this send blocks, due to
> MemoryUsagePercent being 100. Since the consumer blocks while trying to
> send the small message, it no longer consumes any of the large messages
> and our application stops.
>
> The same thing happens even when the consumer simply consumes the large
> message and puts it into a Java queue serviced by another thread. The
> rapidly produced big messages fill up the broker's memory, causing the
> thread to block when trying to send a small message after taking a big
> message from the queue.
>
> In our case, we definately don't want to drop messages if the consumer
> can't keep up. Instead, we want the the fast producer to block, without
> using up all the broker's resources, preventing the slow consumers from
> actually handling the messages they receive (which involves sending
> another message).
>
> I seem to recall that I read somewhere in the mailing list archives or
> in one of the JIRA issues about a per-destination buffer. It seems like
> this would solve our problem: as long as there's a bit of space for the
> slow consumers to send out their messages, they can keep going and the
> producer can send new big messages as space becomes available.
>
> Should this be happening already? Are we doing something wrong?
>
> Any comments appreciated.
>
> Cheers,
>
> Albert
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Blocking on UsageManager.waitForSpace again

Posted by Albert Strasheim <13...@sun.ac.za>.
Hello all

On Mon, 22 Jan 2007, James Strachan wrote:

> FWIW I"d definitely recommend you use non-persistent sending on your
> producers (otherwise the spool to disk won't kick in). Also are you
> actually ack'ing your messages?

First off, we're using ActiveMQ 4.2 from trunk.

I've taken a closer look at our application and I think I'm beginning to 
understand what's going on.

We have a non-persistent producer sending to a non-durable consumer
through a topic. Sessions are set to auto-acknowledge We're seeing 
issues when running our test code, which uses an embedded broker, but I 
think these issues will also crop up with a separate broker.

The producer is sending relatively large messages (1--2 kB) as fast as 
the data they contain can be read from disk. The consumer subscribed to 
the topic does a lot of processing on these messages (i.e. the 
consumer is slow) before sending a few smaller messages to another 
topic. What usually happens is that this send blocks, due to 
MemoryUsagePercent being 100. Since the consumer blocks while trying to 
send the small message, it no longer consumes any of the large messages 
and our application stops.

The same thing happens even when the consumer simply consumes the large 
message and puts it into a Java queue serviced by another thread. The 
rapidly produced big messages fill up the broker's memory, causing the 
thread to block when trying to send a small message after taking a big 
message from the queue.

In our case, we definately don't want to drop messages if the consumer 
can't keep up. Instead, we want the the fast producer to block, without 
using up all the broker's resources, preventing the slow consumers from 
actually handling the messages they receive (which involves sending 
another message).

I seem to recall that I read somewhere in the mailing list archives or 
in one of the JIRA issues about a per-destination buffer. It seems like 
this would solve our problem: as long as there's a bit of space for the 
slow consumers to send out their messages, they can keep going and the 
producer can send new big messages as space becomes available.

Should this be happening already? Are we doing something wrong?

Any comments appreciated.

Cheers,

Albert

Re: Blocking on UsageManager.waitForSpace again

Posted by James Strachan <ja...@gmail.com>.
FWIW I"d definitely recommend you use non-persistent sending on your
producers (otherwise the spool to disk won't kick in). Also are you
actually ack'ing your messages?

On 1/21/07, Albert Strasheim <13...@sun.ac.za> wrote:
> Hello all
>
> On Thu, 18 Jan 2007, James.Strachan wrote:
>
> > Have you tried setting the UsageManager configuration to something large? Out
> > of the box its set to something really tiny.
> >
> > BTW if you have an issue, please mention the version of the software you are
> > using.
> > http://incubator.apache.org/activemq/support.html
> > James
>
> Some more details from the day's debugging.
>
> We're running ActiveMQ 4.2 from trunk as checked out on Friday.
>
> Our test has a thread that produces messages. On another connection in
> the same application, we have a consumer with a message listener set.
> This listener produces a new message.
>
> Debugging with Eclipse, I see that the first time the code goes into
> UsageManager.waitForSpace, it is calling from the thread that produces.
> The stack looks like this:
>
> Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in UsageManager))
>         UsageManager.waitForSpace() line: 91
>         UsageManager.waitForSpace() line: 88
>         Topic.send(ConnectionContext, Message) line: 248
>         ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) line: 305
>         ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) line: 381
>         TransactionBroker.send(ConnectionContext, Message) line: 197
>         AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126
>         CompositeDestinationBroker.send(ConnectionContext, Message) line: 98
>         BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) line: 136
>         TransportConnection.processMessage(Message) line: 449
>         ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604
>         TransportConnection.service(Command) line: 258
>         TransportConnection$1.onCommand(Object) line: 164
>         ResponseCorrelator.onCommand(Object) line: 95
>         MutexTransport(TransportFilter).onCommand(Object) line: 65
>         VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99
>         VMTransportServer$1(VMTransport).oneway(Object) line: 86
>         MutexTransport.oneway(Object) line: 44
>         ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69
>         ResponseCorrelator.request(Object) line: 74
>         ActiveMQConnection.syncSendPacket(Command) line: 1185
>         ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, Message, int, int, long) line: 1547
>         ActiveMQMessageProducer.send(Destination, Message, int, int, long) line: 473
>         ActiveMQMessageProducer.send(Message) line: 358
>         <thread in my application calling send>
>
> If I then allow the VM to continue, it runs for a short while and then
> the single remaining ActiveMQ Session Task thread blocks:
>
> Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in UsageManager))
>         UsageManager.waitForSpace() line: 91
>         UsageManager.waitForSpace() line: 88
>         Topic.send(ConnectionContext, Message) line: 248
>         ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) line: 305
>         ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) line: 381
>         TransactionBroker.send(ConnectionContext, Message) line: 197
>         AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126
>         CompositeDestinationBroker.send(ConnectionContext, Message) line: 98
>         BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) line: 136
>         TransportConnection.processMessage(Message) line: 449
>         ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604
>         TransportConnection.service(Command) line: 258
>         TransportConnection$1.onCommand(Object) line: 164
>         ResponseCorrelator.onCommand(Object) line: 95
>         MutexTransport(TransportFilter).onCommand(Object) line: 65
>         VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99
>         VMTransportServer$1(VMTransport).oneway(Object) line: 86
>         MutexTransport.oneway(Object) line: 44
>         ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69
>         ResponseCorrelator.request(Object) line: 74
>         ActiveMQConnection.syncSendPacket(Command) line: 1185
>         ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, Message, int, int, long) line: 1547
>         ActiveMQMessageProducer.send(Destination, Message, int, int, long) line: 473
>         ActiveMQMessageProducer.send(Message) line: 358
>
>         <onMessage of listener in my application is called. it calls send.>
>
>         ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870
>         ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99
>         ActiveMQSessionExecutor.iterate() line: 166
>         PooledTaskRunner.runTask() line: 111
>         PooledTaskRunner.access$1(PooledTaskRunner) line: 95
>         PooledTaskRunner$1.run() line: 44
>         ThreadPoolExecutor$Worker.runTask(Runnable) line: 885
>         ThreadPoolExecutor$Worker.run() line: 907
>         Thread.run() line: 619
>
> There doesn't seem to be any other ActiveMQ threads around, so I can't
> see how this thread is going to wake up from its
>
> usageMutex.wait();
>
> that is blocking on.
>
> Any thoughts? Is it wrong to produce straight from a message listener?
>
> By the way, I noticed that increasing the memory available to the
> memory manager from 20 MB to 128 MB actually makes the application block
> sooner rather than later. I also experimented with some prefetch policy
> settings, but these don't make much of a difference. Currently, we're
> simply using
>
> vm://localhost?broker.useJmx=true&broker.persistent=false
>
> as our broker URL.
>
> We're using topics throughout. Sessions' acknowledge modes are all set
> to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string
> and integer properties.
>
> Cheers,
>
> Albert
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Blocking on UsageManager.waitForSpace again

Posted by Albert Strasheim <13...@sun.ac.za>.
Hello all

On Thu, 18 Jan 2007, James.Strachan wrote:

> Have you tried setting the UsageManager configuration to something large? Out
> of the box its set to something really tiny.
> 
> BTW if you have an issue, please mention the version of the software you are
> using.
> http://incubator.apache.org/activemq/support.html
> James

Some more details from the day's debugging.

We're running ActiveMQ 4.2 from trunk as checked out on Friday.

Our test has a thread that produces messages. On another connection in 
the same application, we have a consumer with a message listener set. 
This listener produces a new message.

Debugging with Eclipse, I see that the first time the code goes into 
UsageManager.waitForSpace, it is calling from the thread that produces. 
The stack looks like this:

Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in UsageManager))	
	UsageManager.waitForSpace() line: 91	
	UsageManager.waitForSpace() line: 88	
	Topic.send(ConnectionContext, Message) line: 248	
	ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) line: 305	
	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) line: 381	
	TransactionBroker.send(ConnectionContext, Message) line: 197	
	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126	
	CompositeDestinationBroker.send(ConnectionContext, Message) line: 98	
	BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) line: 136	
	TransportConnection.processMessage(Message) line: 449	
	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604	
	TransportConnection.service(Command) line: 258	
	TransportConnection$1.onCommand(Object) line: 164	
	ResponseCorrelator.onCommand(Object) line: 95	
	MutexTransport(TransportFilter).onCommand(Object) line: 65	
	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
	MutexTransport.oneway(Object) line: 44	
	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
	ResponseCorrelator.request(Object) line: 74	
	ActiveMQConnection.syncSendPacket(Command) line: 1185	
	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, Message, int, int, long) line: 1547	
	ActiveMQMessageProducer.send(Destination, Message, int, int, long) line: 473	
	ActiveMQMessageProducer.send(Message) line: 358	
        <thread in my application calling send>

If I then allow the VM to continue, it runs for a short while and then 
the single remaining ActiveMQ Session Task thread blocks:

Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in UsageManager))	
	UsageManager.waitForSpace() line: 91	
	UsageManager.waitForSpace() line: 88	
	Topic.send(ConnectionContext, Message) line: 248	
	ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) line: 305	
	ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) line: 381	
	TransactionBroker.send(ConnectionContext, Message) line: 197	
	AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126	
	CompositeDestinationBroker.send(ConnectionContext, Message) line: 98	
	BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) line: 136	
	TransportConnection.processMessage(Message) line: 449	
	ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604	
	TransportConnection.service(Command) line: 258	
	TransportConnection$1.onCommand(Object) line: 164	
	ResponseCorrelator.onCommand(Object) line: 95	
	MutexTransport(TransportFilter).onCommand(Object) line: 65	
	VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99	
	VMTransportServer$1(VMTransport).oneway(Object) line: 86	
	MutexTransport.oneway(Object) line: 44	
	ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69	
	ResponseCorrelator.request(Object) line: 74	
	ActiveMQConnection.syncSendPacket(Command) line: 1185	
	ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, Message, int, int, long) line: 1547	
	ActiveMQMessageProducer.send(Destination, Message, int, int, long) line: 473	
	ActiveMQMessageProducer.send(Message) line: 358	
	
	<onMessage of listener in my application is called. it calls send.>
	
	ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870	
	ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99	
	ActiveMQSessionExecutor.iterate() line: 166	
	PooledTaskRunner.runTask() line: 111	
	PooledTaskRunner.access$1(PooledTaskRunner) line: 95	
	PooledTaskRunner$1.run() line: 44	
	ThreadPoolExecutor$Worker.runTask(Runnable) line: 885	
	ThreadPoolExecutor$Worker.run() line: 907	
	Thread.run() line: 619	

There doesn't seem to be any other ActiveMQ threads around, so I can't 
see how this thread is going to wake up from its

usageMutex.wait();

that is blocking on.

Any thoughts? Is it wrong to produce straight from a message listener?

By the way, I noticed that increasing the memory available to the 
memory manager from 20 MB to 128 MB actually makes the application block 
sooner rather than later. I also experimented with some prefetch policy 
settings, but these don't make much of a difference. Currently, we're 
simply using

vm://localhost?broker.useJmx=true&broker.persistent=false

as our broker URL.

We're using topics throughout. Sessions' acknowledge modes are all set 
to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string 
and integer properties.

Cheers,

Albert

Re: Blocking on UsageManager.waitForSpace again

Posted by "James.Strachan" <ja...@gmail.com>.
Have you tried setting the UsageManager configuration to something large? Out
of the box its set to something really tiny.

BTW if you have an issue, please mention the version of the software you are
using.
http://incubator.apache.org/activemq/support.html
James


Albert Strasheim-2 wrote:
> 
> Hello all
> 
> We're having a problem where our application is blocking on 
> UsageManager.waitForSpace for no obvious reason. The symptoms are very 
> similar to what was discussed in this thread:
> 
> http://www.mail-archive.com/activemq-users@geronimo.apache.org/msg06288.html
> 
> We set up few sessions with producers and consumers that send two types of 
> messages, with all message sizes in the low kB range. We're using 
> non-persistent, non-durable topics to communicate.
> 
> The application runs fine for about 2 minutes and during this time 
> MemoryPercentageUsed (checked with JConsole) remains below 10% the whole 
> time until it suddenly goes up to 100 and the application stops. Our 
> application isn't doing any different at the point this abrupt increase 
> occurs. It produces and consumes the same messages as fast as the CPU
> allows 
> the whole time until it blocks.
> 
> Running with the JDK 1.6.0 server VM without any extra arguments,
> according 
> to JConsole, the maximum heap size is 517 MB. It seems like the hang
> occurs 
> each time exactly when the used heap size goes over 200 MB. Running 
> ith  -Xms384m -Xmx512m I see the used heap size go up to 300+ MB then come 
> back to down to a little more than 200 MB and then when it reaches 260 MB, 
> it hangs again.
> 
> Running the client VM, the hang happens when the used heap size goes over 
> 300 MB, each time.
> 
> I'm using Windows XP SP2 with a Core 2 Duo processor.
> 
> Any thoughts?
> 
> Cheers,
> 
> Albert Strasheim
> 
> P. S. JConsole doesn't report any deadlocks in ActiveMQ or our
> application, 
> so it looks like it really is blocking on UsageManager.waitForSpace.
> 
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/Blocking-on-UsageManager.waitForSpace-again-tf3031460.html#a8435673
Sent from the ActiveMQ - User mailing list archive at Nabble.com.