You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Rob Davies <ra...@gmail.com> on 2010/02/12 22:11:16 UTC

Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Hi Scott,

just change the below config to enable flow control - i.e:

<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
  <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">

in 5.3 - producerFlowControl is on by default - so just remove the  
producerFlowControl entry from your configuration.

If this all sounds double dutch - send in your config - and we'll help  
with the correct settings :)


On 12 Feb 2010, at 20:49, scot.hale wrote:

>
> Fred,
>
> Were you able to configure ActiveMQ to grow without surpassing the  
> memory
> setting?  I am trying to figure out how to do the same thing.
>
> -Scot
>
>
> Fred Moore-3 wrote:
>>
>> Hi,
>>
>> going back to Cursors and
>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>> ...
>>
>> ...can anyone shed some light on the actual role of memoryLimit in:
>>   <policyEntry topic=">" producerFlowControl="false"  
>> memoryLimit="1mb">
>>   <policyEntry queue=">" producerFlowControl="false"  
>> memoryLimit="1mb">
>>
>> ...moreover: *when* will producerFlowControl start slowing down  
>> consumers?
>>
>> Cheers,
>> F.
>>
>>
>
> -- 
> View this message in context: http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Rob Davies
http://twitter.com/rajdavies
I work here: http://fusesource.com
My Blog: http://rajdavies.blogspot.com/
I'm writing this: http://www.manning.com/snyder/






Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by Mats Henrikson <ma...@zakalwe.com>.
Gary,

I've been trying to do pretty much the same thing that Scott is trying
to do, and I can't get it to work either - no matter what I do I seem
to be able to blow the broker up with an OOME.

What I want to do is configure my broker so that it becomes impossible
to run it out of memory or lock it up, messages should just continue
to be saved to disk until we hit the <storeUsage> limit.

My interpretation of what you write in this thread is that if I make
the <memoryUsage> limit something sensible like 100 mb, and set the
queue <policyEntry> memoryLimit to something higher than 100 mb,
messages should go in the store until the store is full.

However, with that configuration on the latest ActiveMQ 5.3.1 snapshot
(Thu Jan 28) it ignores the <memoryUsage> setting, and if the
<policyEntry> memoryLimit setting is high enough will quite happily
run out of heap space. If the memoryLimit is set below the heap space
limit (-Xmx) then at some point (appears to always be the same number
of messages in my test) the broker just locks up; the producer stops
sending messages, I can't connect a consumer to get any messages off,
and JMX stops returning any information.

My test configuration is this:

        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" memoryLimit="10gb"
optimizedDispatch="true" producerFlowControl="false">
                        <pendingQueuePolicy>
                            <fileQueueCursor/>
                        </pendingQueuePolicy>
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <persistenceAdapter>
            <!-- while testing lets turn off write sync so the tests
run quicker -->
            <kahaDB directory="/var/spool/activemq/kahadb"
enableJournalDiskSyncs="false"/>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="100 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="20 gb" name="store"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="tcp" uri="tcp://localhost:61616"/>
        </transportConnectors>


I also set the following in /etc/activemq.conf:

ACTIVEMQ_OPTS="-Xmx1500m -XX:MaxPermSize=256m
-Dorg.apache.activemq.UseDedicatedTaskRunner=false"

I would be very interested in hearing what I am doing wrong.

Regards,

Mats Henrikson



On 16 February 2010 08:29, Gary Tully <ga...@gmail.com> wrote:
> First thing is you need to use the FilePendingMessageStoragePolicy() as that
> will off load message references to the file system when
> SystemUsage.MemoryUsage limit is reached.
>
> So 1) add the following to the broker policy entry
>        PendingQueueMessageStoragePolicy pendingQueuePolicy = new
> FilePendingQueueMessageStoragePolicy();
>        policy.setPendingQueuePolicy(pendingQueuePolicy);
>
> With flow control on, you need to configure a lower SystemUsage as the use
> of disk space by the file based cursors is determined by the shared
> SystemUsage.memoryLimit, which by default is the same value as the memory
> limit for a destination. With a single destination, the flowcontroll kicks
> in before the system usage so no spooling to disk occurs.
>
> 2) Configure a SystemUsage.MemoryLimit that is less than the default
> destination memory limit of 64M
>   brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 *
> 63);
>
> This should do it once you add a TempStore() limit to implement 5.

Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by Mats Henrikson <ma...@zakalwe.com>.
On 17 February 2010 16:51, Mats Henrikson <ma...@zakalwe.com> wrote:
> When set the broker will just freeze after some number of messages,
> and using JMX/JConsole the queue now doesn't render its attributes,
> you can't connect a consumer etc.

I have logged this bug for this problem:

https://issues.apache.org/activemq/browse/AMQ-2618

Mats

Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by Mats Henrikson <ma...@zakalwe.com>.
Hi Gary,

I checked out your test case and seeing that that worked fine set out
to figure out why my test case didn't.

As it turns out it is because of my <tempUsage>:

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="100 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="20 gb" name="store"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

When I comment out the <tempUsage> everything works fine.

Your test case has the same behaviour, if you also set:

        brokerService.getSystemUsage().getTempUsage().setLimit(10 *
1024 * 1024);

When set the broker will just freeze after some number of messages,
and using JMX/JConsole the queue now doesn't render its attributes,
you can't connect a consumer etc.

Is that expected behaviour?

I also found that using my current configuration I don't seem to have
to tweak my cursorMemoryHighWaterMark at all, everything seems to work
fine without it for my settings (queue memoryLimit=10mb,
producerFlowControl=true, memoryUsage limit=100mb, storeUsage
limit=20gb).

Regards,

Mats Henrikson



On 17 February 2010 02:15, Gary Tully <ga...@gmail.com> wrote:
> Ok. there is a problem and a workaround. see:
> https://issues.apache.org/activemq/browse/AMQ-2610
> The test case[1] attached to the jira works as expected with the workaround.
> The tests is based on the code posted by Scot.
>
> As the file cursor and the queue share the same usage, (they are split for
> vmcursors), the key to ensuring the cursor limit kicks in before the queue
> memory limit (and producer flow controll) is to configure a cursor,
> policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value
> used by the Queue. This ensures that it will spool messages to disk once 50%
> of the system usage is reached.
>
> Have a peek at the test case, [1]
> https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java
>
>

Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by "scot.hale" <sc...@gmail.com>.
I went back to 5.1 and this behavior seems to work well by default with the
KahaPersistenceAdapter.  I assume it is using the Store Based Cursor.  I
didn't configure a PolicyEntry.  Memory usage is capped at 64MB, which is
the default cap.   I will use 5.1 as my work around for now.



scot.hale wrote:
> 
> Should this work in 5.3.0, or do we need to wait for 5.4.0 to be released?  
> 
> 
> 
> 
> 
> Gary Tully wrote:
>> 
>> Ok. there is a problem and a workaround. see:
>> https://issues.apache.org/activemq/browse/AMQ-2610
>> The test case[1] attached to the jira works as expected with the
>> workaround.
>> The tests is based on the code posted by Scot.
>> 
>> As the file cursor and the queue share the same usage, (they are split
>> for
>> vmcursors), the key to ensuring the cursor limit kicks in before the
>> queue
>> memory limit (and producer flow controll) is to configure a cursor,
>> policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value
>> used by the Queue. This ensures that it will spool messages to disk once
>> 50%
>> of the system usage is reached.
>> 
>> Have a peek at the test case, [1]
>> https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java
>> 
>> 
>> On 16 February 2010 11:38, Gary Tully <ga...@gmail.com> wrote:
>> 
>>> There is something not right here. let me build a test case to
>>> investigate
>>> a bit.
>>>
>>>
>>> On 16 February 2010 01:19, scot.hale <sc...@gmail.com> wrote:
>>>
>>>>
>>>> A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume
>>>> that
>>>> this needs to be added to the queue destination policy specifically.
>>>> However I added it to default and Topic just to be sure (not shown
>>>> here).
>>>>
>>>> I turned on flow control, but was unable to figure out what memory
>>>> settings
>>>> are needed.  What I gathered from your post is that I need to set the
>>>> queue
>>>> destination memory limit higher than the default SystemUsage memory
>>>> limit.
>>>> Is that right?  For example:
>>>>
>>>>
>>>>
>>>>
>>>> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
>>>>
>>>> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);
>>>>
>>>>        PolicyMap policyMap = new PolicyMap();
>>>>
>>>>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>>>
>>>>        PolicyEntry policy = new PolicyEntry();
>>>>        policy.setProducerFlowControl(true);
>>>>         policy.setPendingQueuePolicy(new
>>>> FilePendingQueueMessageStoragePolicy());
>>>>        policy.setQueue(">");
>>>>        policy.setMemoryLimit(64*1024*1024);
>>>>         entries.add(policy);
>>>>        policyMap.setPolicyEntries(entries);
>>>>
>>>>        brokerService.setDestinationPolicy(policyMap);
>>>>
>>>>
>>>> I tried it the other way around as well and it still stops (meaning
>>>> producers are blocked or resourceAllocationExceptions are thrown from
>>>> Queue.send()) when it gets to the lower of the two memory limits. I am
>>>> definitely missing something.
>>>>
>>>> B.) Would using the StorePendingQueueMessageStoragePolicy provide the
>>>> same
>>>> behavior I am looking for?
>>>>
>>>> C.) I didn't understand the last sentence in your post.  Does this mean
>>>> that
>>>> when the brokerService.getSystemUsage().getTempUsage() is the disk
>>>> usage
>>>> limit that should generate ResourceAllocationExceptions (assuming
>>>> SendFailIfNoSpace is set to true)?  In my configureation, it would mean
>>>> when
>>>> the 128MB is used up by the temp cursor references on disk, then no
>>>> more
>>>> resources are available?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Gary Tully wrote:
>>>> >
>>>> > First thing is you need to use the FilePendingMessageStoragePolicy()
>>>> as
>>>> > that
>>>> > will off load message references to the file system when
>>>> > SystemUsage.MemoryUsage limit is reached.
>>>> >
>>>> > So 1) add the following to the broker policy entry
>>>> >         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
>>>> > FilePendingQueueMessageStoragePolicy();
>>>> >         policy.setPendingQueuePolicy(pendingQueuePolicy);
>>>> >
>>>> > With flow control on, you need to configure a lower SystemUsage as
>>>> the
>>>> use
>>>> > of disk space by the file based cursors is determined by the shared
>>>> > SystemUsage.memoryLimit, which by default is the same value as the
>>>> memory
>>>> > limit for a destination. With a single destination, the flowcontroll
>>>> kicks
>>>> > in before the system usage so no spooling to disk occurs.
>>>> >
>>>> > 2) Configure a SystemUsage.MemoryLimit that is less than the default
>>>> > destination memory limit of 64M
>>>> >    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 *
>>>> 1024
>>>> *
>>>> > 63);
>>>> >
>>>> > This should do it once you add a TempStore() limit to implement 5.
>>>> >
>>>> >
>>>> > On 15 February 2010 17:22, scot.hale <sc...@gmail.com> wrote:
>>>> >
>>>> >>
>>>> >> I am trying to setup a queue with the following requirements:
>>>> >>
>>>> >>
>>>> >> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
>>>> >> 1. ) VM Transport
>>>> >> 2. ) Persistent with KahaPersistenceAdaptor
>>>> >> 4. ) JVM Memory usage is capped at something like 64MB
>>>> >>        - When this limit is reached the producers should continue to
>>>> >> store
>>>> >> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
>>>> >> work,
>>>> >> since the former is the default that is the one I have been using.)
>>>> >> 5. ) File System usage is capped at something like 10GB
>>>> >>        - When this limit is reached the producers should start
>>>> throwing
>>>> >> javax.jms.ResourceAllocationExceptions to the Producers
>>>> >>
>>>> >> Number 5 is the least important, as it will be difficult to fill up
>>>> disk
>>>> >> space in production. My current setup configures ActiveMQ
>>>> >> programatically.
>>>> >> I don't think this is introducing problems, let me know if there are
>>>> >> issues
>>>> >> with programatic configuration.
>>>> >>
>>>> >>
>>>> >> Default settings:
>>>> >>        If I do not configure the SystemUsage or the Flow control,
>>>> then
>>>> >> 64MB
>>>> >> default memory usage is reached and the producers are halted even
>>>> though
>>>> >> the
>>>> >> queues are persistent and have much more space.  Should the default
>>>> >> StoreBasedCursor behave this way?
>>>> >>
>>>> >>
>>>> >> Turn off Flow Control:
>>>> >>        When I turn off Flow Control with default SystemUseage
>>>> settings,
>>>> >> then the
>>>> >> JVM memory is not capped.  After about 5 million messages with no
>>>> >> consumers
>>>> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>>>> >>
>>>> >>
>>>> >> So what setting do I need to cap the memory and allow the messages
>>>> to
>>>> be
>>>> >> stored to disk even when the cap is reached?
>>>> >>
>>>> >>
>>>> >>
>>>> >> This is how I programtically configure my BrokerService
>>>> >>
>>>> >>        System.setProperty("defaultBinSize", "16384");//Only way to
>>>> set
>>>> >> HashIndex bin size for KahaPersistenceAdapter
>>>> >>        try {
>>>> >>            uri = new URI("vm://"+brokerName);
>>>> >>        } catch (URISyntaxException e) {
>>>> >>            throw new RuntimeException(e);
>>>> >>        }
>>>> >>        brokerService = new BrokerService();
>>>> >>        brokerService.setBrokerName(brokerName);
>>>> >>        brokerService.setUseJmx(true);
>>>> >>        brokerService.setUseLoggingForShutdownErrors(true);
>>>> >>
>>>> >>
>>>> >>        PolicyMap policyMap = new PolicyMap();
>>>> >>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>>> >>        PolicyEntry policy = new PolicyEntry();
>>>> >>        policy.setProducerFlowControl(true);
>>>> >>        policy.setQueue(">");
>>>> >>        entries.add(policy);
>>>> >>        policyMap.setPolicyEntries(entries);
>>>> >>        brokerService.setDestinationPolicy(policyMap);
>>>> >>
>>>> >>        //PERSISTENCE
>>>> >>        brokerService.setPersistent(true);
>>>> >>        KahaPersistenceAdapter persistenceAdapter = new
>>>> >> KahaPersistenceAdapter();
>>>> >>        persistenceAdapter.setDirectory(new
>>>> >> File("/tmp/activemq-"+brokerName+"/kaha"));
>>>> >>        brokerService.setDataDirectoryFile(new
>>>> >> File("/tmp/activemq-"+brokerName+"/data"));
>>>> >>        brokerService.setTmpDataDirectory(new
>>>> >> File("/tmp/activemq-"+brokerName+"/temp"));
>>>> >>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>>>> >>
>>>> >>        try {
>>>> >>            brokerService.setPersistenceAdapter(persistenceAdapter);
>>>> >>        } catch (IOException e) {
>>>> >>            throw new RuntimeException(e);
>>>> >>        }
>>>> >>        try {
>>>> >>           
>>>> brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>>>> >>            brokerService.addConnector(uri);
>>>> >>            brokerService.start();
>>>> >>        } catch (Exception e) {
>>>> >>            throw new RuntimeException(e);
>>>> >>        }
>>>> >>
>>>> >>
>>>> >>
>>>> >> Here is a Producer:
>>>> >>
>>>> >> public class Producer implements Runnable{
>>>> >>
>>>> >>    private BrokerService brokerService;
>>>> >>    private long numberOfMessages;
>>>> >>
>>>> >>    public Producer(BrokerService brokerService, long n){
>>>> >>        this.brokerService = brokerService;
>>>> >>        this.numberOfMessages = n;
>>>> >>    }
>>>> >>
>>>> >>    public void run(){
>>>> >>        ActiveMQConnectionFactory factory = new
>>>> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>>>> >>        try {
>>>> >>            Connection conn = factory.createConnection();
>>>> >>            conn.start();
>>>> >>            for (int i = 0; i < numberOfMessages; i++) {
>>>> >>                Session session = conn.createSession(false,
>>>> >> Session.AUTO_ACKNOWLEDGE);
>>>> >>                Destination destination =
>>>> >> session.createQueue("test-queue");
>>>> >>                MessageProducer producer =
>>>> >> session.createProducer(destination);
>>>> >>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>>> >>                BytesMessage message = session.createBytesMessage();
>>>> >>                message.writeBytes(new
>>>> >>
>>>> >>
>>>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>>>> >>                try {
>>>> >>                    producer.send(message);
>>>> >>                } catch (ResourceAllocationException e) {
>>>> >>                    e.printStackTrace();
>>>> >>                }
>>>> >>                session.close();
>>>> >>            }
>>>> >>        } catch (JMSException e) {
>>>> >>            throw new RuntimeException(e);
>>>> >>         }
>>>> >>
>>>> >>    }
>>>> >> }
>>>> >>
>>>> >>
>>>> >> rajdavies wrote:
>>>> >> >
>>>> >> > Hi Scott,
>>>> >> >
>>>> >> > just change the below config to enable flow control - i.e:
>>>> >> >
>>>> >> > <policyEntry topic=">" producerFlowControl="true"
>>>> memoryLimit="1mb">
>>>> >> >   <policyEntry queue=">" producerFlowControl="true"
>>>> memoryLimit="1mb">
>>>> >> >
>>>> >> > in 5.3 - producerFlowControl is on by default - so just remove the
>>>> >> > producerFlowControl entry from your configuration.
>>>> >> >
>>>> >> > If this all sounds double dutch - send in your config - and we'll
>>>> help
>>>> >> > with the correct settings :)
>>>> >> >
>>>> >> >
>>>> >> > On 12 Feb 2010, at 20:49, scot.hale wrote:
>>>> >> >
>>>> >> >>
>>>> >> >> Fred,
>>>> >> >>
>>>> >> >> Were you able to configure ActiveMQ to grow without surpassing
>>>> the
>>>> >> >> memory
>>>> >> >> setting?  I am trying to figure out how to do the same thing.
>>>> >> >>
>>>> >> >> -Scot
>>>> >> >>
>>>> >> >>
>>>> >> >> Fred Moore-3 wrote:
>>>> >> >>>
>>>> >> >>> Hi,
>>>> >> >>>
>>>> >> >>> going back to Cursors and
>>>> >> >>>
>>>> >>
>>>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>>>> >> >>> ...
>>>> >> >>>
>>>> >> >>> ...can anyone shed some light on the actual role of memoryLimit
>>>> in:
>>>> >> >>>   <policyEntry topic=">" producerFlowControl="false"
>>>> >> >>> memoryLimit="1mb">
>>>> >> >>>   <policyEntry queue=">" producerFlowControl="false"
>>>> >> >>> memoryLimit="1mb">
>>>> >> >>>
>>>> >> >>> ...moreover: *when* will producerFlowControl start slowing down
>>>> >> >>> consumers?
>>>> >> >>>
>>>> >> >>> Cheers,
>>>> >> >>> F.
>>>> >> >>>
>>>> >> >>>
>>>> >> >>
>>>> >> >> --
>>>> >> >> View this message in context:
>>>> >> >>
>>>> >>
>>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>>>> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>> >> >>
>>>> >> >
>>>> >> > Rob Davies
>>>> >> > http://twitter.com/rajdavies
>>>> >> > I work here: http://fusesource.com
>>>> >> > My Blog: http://rajdavies.blogspot.com/
>>>> >> > I'm writing this: http://www.manning.com/snyder/
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >>
>>>> >> --
>>>> >> View this message in context:
>>>> >>
>>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
>>>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>> >>
>>>> >>
>>>> >
>>>> >
>>>> > --
>>>> > http://blog.garytully.com
>>>> >
>>>> > Open Source Integration
>>>> > http://fusesource.com
>>>> >
>>>> >
>>>>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>> --
>>> http://blog.garytully.com
>>>
>>> Open Source Integration
>>> http://fusesource.com
>>>
>> 
>> 
>> 
>> -- 
>> http://blog.garytully.com
>> 
>> Open Source Integration
>> http://fusesource.com
>> 
>> 
> 
> 

-- 
View this message in context: http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27617794.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by "scot.hale" <sc...@gmail.com>.
Should this work in 5.3.0, or do we need to wait for 5.4.0 to be released?  





Gary Tully wrote:
> 
> Ok. there is a problem and a workaround. see:
> https://issues.apache.org/activemq/browse/AMQ-2610
> The test case[1] attached to the jira works as expected with the
> workaround.
> The tests is based on the code posted by Scot.
> 
> As the file cursor and the queue share the same usage, (they are split for
> vmcursors), the key to ensuring the cursor limit kicks in before the queue
> memory limit (and producer flow controll) is to configure a cursor,
> policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value
> used by the Queue. This ensures that it will spool messages to disk once
> 50%
> of the system usage is reached.
> 
> Have a peek at the test case, [1]
> https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java
> 
> 
> On 16 February 2010 11:38, Gary Tully <ga...@gmail.com> wrote:
> 
>> There is something not right here. let me build a test case to
>> investigate
>> a bit.
>>
>>
>> On 16 February 2010 01:19, scot.hale <sc...@gmail.com> wrote:
>>
>>>
>>> A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume
>>> that
>>> this needs to be added to the queue destination policy specifically.
>>> However I added it to default and Topic just to be sure (not shown
>>> here).
>>>
>>> I turned on flow control, but was unable to figure out what memory
>>> settings
>>> are needed.  What I gathered from your post is that I need to set the
>>> queue
>>> destination memory limit higher than the default SystemUsage memory
>>> limit.
>>> Is that right?  For example:
>>>
>>>
>>>
>>>
>>> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
>>>
>>> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);
>>>
>>>        PolicyMap policyMap = new PolicyMap();
>>>
>>>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>>
>>>        PolicyEntry policy = new PolicyEntry();
>>>        policy.setProducerFlowControl(true);
>>>         policy.setPendingQueuePolicy(new
>>> FilePendingQueueMessageStoragePolicy());
>>>        policy.setQueue(">");
>>>        policy.setMemoryLimit(64*1024*1024);
>>>         entries.add(policy);
>>>        policyMap.setPolicyEntries(entries);
>>>
>>>        brokerService.setDestinationPolicy(policyMap);
>>>
>>>
>>> I tried it the other way around as well and it still stops (meaning
>>> producers are blocked or resourceAllocationExceptions are thrown from
>>> Queue.send()) when it gets to the lower of the two memory limits. I am
>>> definitely missing something.
>>>
>>> B.) Would using the StorePendingQueueMessageStoragePolicy provide the
>>> same
>>> behavior I am looking for?
>>>
>>> C.) I didn't understand the last sentence in your post.  Does this mean
>>> that
>>> when the brokerService.getSystemUsage().getTempUsage() is the disk usage
>>> limit that should generate ResourceAllocationExceptions (assuming
>>> SendFailIfNoSpace is set to true)?  In my configureation, it would mean
>>> when
>>> the 128MB is used up by the temp cursor references on disk, then no more
>>> resources are available?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Gary Tully wrote:
>>> >
>>> > First thing is you need to use the FilePendingMessageStoragePolicy()
>>> as
>>> > that
>>> > will off load message references to the file system when
>>> > SystemUsage.MemoryUsage limit is reached.
>>> >
>>> > So 1) add the following to the broker policy entry
>>> >         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
>>> > FilePendingQueueMessageStoragePolicy();
>>> >         policy.setPendingQueuePolicy(pendingQueuePolicy);
>>> >
>>> > With flow control on, you need to configure a lower SystemUsage as the
>>> use
>>> > of disk space by the file based cursors is determined by the shared
>>> > SystemUsage.memoryLimit, which by default is the same value as the
>>> memory
>>> > limit for a destination. With a single destination, the flowcontroll
>>> kicks
>>> > in before the system usage so no spooling to disk occurs.
>>> >
>>> > 2) Configure a SystemUsage.MemoryLimit that is less than the default
>>> > destination memory limit of 64M
>>> >    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 *
>>> 1024
>>> *
>>> > 63);
>>> >
>>> > This should do it once you add a TempStore() limit to implement 5.
>>> >
>>> >
>>> > On 15 February 2010 17:22, scot.hale <sc...@gmail.com> wrote:
>>> >
>>> >>
>>> >> I am trying to setup a queue with the following requirements:
>>> >>
>>> >>
>>> >> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
>>> >> 1. ) VM Transport
>>> >> 2. ) Persistent with KahaPersistenceAdaptor
>>> >> 4. ) JVM Memory usage is capped at something like 64MB
>>> >>        - When this limit is reached the producers should continue to
>>> >> store
>>> >> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
>>> >> work,
>>> >> since the former is the default that is the one I have been using.)
>>> >> 5. ) File System usage is capped at something like 10GB
>>> >>        - When this limit is reached the producers should start
>>> throwing
>>> >> javax.jms.ResourceAllocationExceptions to the Producers
>>> >>
>>> >> Number 5 is the least important, as it will be difficult to fill up
>>> disk
>>> >> space in production. My current setup configures ActiveMQ
>>> >> programatically.
>>> >> I don't think this is introducing problems, let me know if there are
>>> >> issues
>>> >> with programatic configuration.
>>> >>
>>> >>
>>> >> Default settings:
>>> >>        If I do not configure the SystemUsage or the Flow control,
>>> then
>>> >> 64MB
>>> >> default memory usage is reached and the producers are halted even
>>> though
>>> >> the
>>> >> queues are persistent and have much more space.  Should the default
>>> >> StoreBasedCursor behave this way?
>>> >>
>>> >>
>>> >> Turn off Flow Control:
>>> >>        When I turn off Flow Control with default SystemUseage
>>> settings,
>>> >> then the
>>> >> JVM memory is not capped.  After about 5 million messages with no
>>> >> consumers
>>> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>>> >>
>>> >>
>>> >> So what setting do I need to cap the memory and allow the messages to
>>> be
>>> >> stored to disk even when the cap is reached?
>>> >>
>>> >>
>>> >>
>>> >> This is how I programtically configure my BrokerService
>>> >>
>>> >>        System.setProperty("defaultBinSize", "16384");//Only way to
>>> set
>>> >> HashIndex bin size for KahaPersistenceAdapter
>>> >>        try {
>>> >>            uri = new URI("vm://"+brokerName);
>>> >>        } catch (URISyntaxException e) {
>>> >>            throw new RuntimeException(e);
>>> >>        }
>>> >>        brokerService = new BrokerService();
>>> >>        brokerService.setBrokerName(brokerName);
>>> >>        brokerService.setUseJmx(true);
>>> >>        brokerService.setUseLoggingForShutdownErrors(true);
>>> >>
>>> >>
>>> >>        PolicyMap policyMap = new PolicyMap();
>>> >>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>> >>        PolicyEntry policy = new PolicyEntry();
>>> >>        policy.setProducerFlowControl(true);
>>> >>        policy.setQueue(">");
>>> >>        entries.add(policy);
>>> >>        policyMap.setPolicyEntries(entries);
>>> >>        brokerService.setDestinationPolicy(policyMap);
>>> >>
>>> >>        //PERSISTENCE
>>> >>        brokerService.setPersistent(true);
>>> >>        KahaPersistenceAdapter persistenceAdapter = new
>>> >> KahaPersistenceAdapter();
>>> >>        persistenceAdapter.setDirectory(new
>>> >> File("/tmp/activemq-"+brokerName+"/kaha"));
>>> >>        brokerService.setDataDirectoryFile(new
>>> >> File("/tmp/activemq-"+brokerName+"/data"));
>>> >>        brokerService.setTmpDataDirectory(new
>>> >> File("/tmp/activemq-"+brokerName+"/temp"));
>>> >>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>>> >>
>>> >>        try {
>>> >>            brokerService.setPersistenceAdapter(persistenceAdapter);
>>> >>        } catch (IOException e) {
>>> >>            throw new RuntimeException(e);
>>> >>        }
>>> >>        try {
>>> >>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>>> >>            brokerService.addConnector(uri);
>>> >>            brokerService.start();
>>> >>        } catch (Exception e) {
>>> >>            throw new RuntimeException(e);
>>> >>        }
>>> >>
>>> >>
>>> >>
>>> >> Here is a Producer:
>>> >>
>>> >> public class Producer implements Runnable{
>>> >>
>>> >>    private BrokerService brokerService;
>>> >>    private long numberOfMessages;
>>> >>
>>> >>    public Producer(BrokerService brokerService, long n){
>>> >>        this.brokerService = brokerService;
>>> >>        this.numberOfMessages = n;
>>> >>    }
>>> >>
>>> >>    public void run(){
>>> >>        ActiveMQConnectionFactory factory = new
>>> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>>> >>        try {
>>> >>            Connection conn = factory.createConnection();
>>> >>            conn.start();
>>> >>            for (int i = 0; i < numberOfMessages; i++) {
>>> >>                Session session = conn.createSession(false,
>>> >> Session.AUTO_ACKNOWLEDGE);
>>> >>                Destination destination =
>>> >> session.createQueue("test-queue");
>>> >>                MessageProducer producer =
>>> >> session.createProducer(destination);
>>> >>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>> >>                BytesMessage message = session.createBytesMessage();
>>> >>                message.writeBytes(new
>>> >>
>>> >>
>>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>>> >>                try {
>>> >>                    producer.send(message);
>>> >>                } catch (ResourceAllocationException e) {
>>> >>                    e.printStackTrace();
>>> >>                }
>>> >>                session.close();
>>> >>            }
>>> >>        } catch (JMSException e) {
>>> >>            throw new RuntimeException(e);
>>> >>         }
>>> >>
>>> >>    }
>>> >> }
>>> >>
>>> >>
>>> >> rajdavies wrote:
>>> >> >
>>> >> > Hi Scott,
>>> >> >
>>> >> > just change the below config to enable flow control - i.e:
>>> >> >
>>> >> > <policyEntry topic=">" producerFlowControl="true"
>>> memoryLimit="1mb">
>>> >> >   <policyEntry queue=">" producerFlowControl="true"
>>> memoryLimit="1mb">
>>> >> >
>>> >> > in 5.3 - producerFlowControl is on by default - so just remove the
>>> >> > producerFlowControl entry from your configuration.
>>> >> >
>>> >> > If this all sounds double dutch - send in your config - and we'll
>>> help
>>> >> > with the correct settings :)
>>> >> >
>>> >> >
>>> >> > On 12 Feb 2010, at 20:49, scot.hale wrote:
>>> >> >
>>> >> >>
>>> >> >> Fred,
>>> >> >>
>>> >> >> Were you able to configure ActiveMQ to grow without surpassing the
>>> >> >> memory
>>> >> >> setting?  I am trying to figure out how to do the same thing.
>>> >> >>
>>> >> >> -Scot
>>> >> >>
>>> >> >>
>>> >> >> Fred Moore-3 wrote:
>>> >> >>>
>>> >> >>> Hi,
>>> >> >>>
>>> >> >>> going back to Cursors and
>>> >> >>>
>>> >>
>>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>>> >> >>> ...
>>> >> >>>
>>> >> >>> ...can anyone shed some light on the actual role of memoryLimit
>>> in:
>>> >> >>>   <policyEntry topic=">" producerFlowControl="false"
>>> >> >>> memoryLimit="1mb">
>>> >> >>>   <policyEntry queue=">" producerFlowControl="false"
>>> >> >>> memoryLimit="1mb">
>>> >> >>>
>>> >> >>> ...moreover: *when* will producerFlowControl start slowing down
>>> >> >>> consumers?
>>> >> >>>
>>> >> >>> Cheers,
>>> >> >>> F.
>>> >> >>>
>>> >> >>>
>>> >> >>
>>> >> >> --
>>> >> >> View this message in context:
>>> >> >>
>>> >>
>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>>> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>> >> >>
>>> >> >
>>> >> > Rob Davies
>>> >> > http://twitter.com/rajdavies
>>> >> > I work here: http://fusesource.com
>>> >> > My Blog: http://rajdavies.blogspot.com/
>>> >> > I'm writing this: http://www.manning.com/snyder/
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >>
>>> >> --
>>> >> View this message in context:
>>> >>
>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
>>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>> >>
>>> >>
>>> >
>>> >
>>> > --
>>> > http://blog.garytully.com
>>> >
>>> > Open Source Integration
>>> > http://fusesource.com
>>> >
>>> >
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>> --
>> http://blog.garytully.com
>>
>> Open Source Integration
>> http://fusesource.com
>>
> 
> 
> 
> -- 
> http://blog.garytully.com
> 
> Open Source Integration
> http://fusesource.com
> 
> 

-- 
View this message in context: http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27615569.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by Gary Tully <ga...@gmail.com>.
Ok. there is a problem and a workaround. see:
https://issues.apache.org/activemq/browse/AMQ-2610
The test case[1] attached to the jira works as expected with the workaround.
The tests is based on the code posted by Scot.

As the file cursor and the queue share the same usage, (they are split for
vmcursors), the key to ensuring the cursor limit kicks in before the queue
memory limit (and producer flow controll) is to configure a cursor,
policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value
used by the Queue. This ensures that it will spool messages to disk once 50%
of the system usage is reached.

Have a peek at the test case, [1]
https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java


On 16 February 2010 11:38, Gary Tully <ga...@gmail.com> wrote:

> There is something not right here. let me build a test case to investigate
> a bit.
>
>
> On 16 February 2010 01:19, scot.hale <sc...@gmail.com> wrote:
>
>>
>> A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume that
>> this needs to be added to the queue destination policy specifically.
>> However I added it to default and Topic just to be sure (not shown here).
>>
>> I turned on flow control, but was unable to figure out what memory
>> settings
>> are needed.  What I gathered from your post is that I need to set the
>> queue
>> destination memory limit higher than the default SystemUsage memory limit.
>> Is that right?  For example:
>>
>>
>>
>>
>> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
>>
>> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);
>>
>>        PolicyMap policyMap = new PolicyMap();
>>
>>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>
>>        PolicyEntry policy = new PolicyEntry();
>>        policy.setProducerFlowControl(true);
>>         policy.setPendingQueuePolicy(new
>> FilePendingQueueMessageStoragePolicy());
>>        policy.setQueue(">");
>>        policy.setMemoryLimit(64*1024*1024);
>>         entries.add(policy);
>>        policyMap.setPolicyEntries(entries);
>>
>>        brokerService.setDestinationPolicy(policyMap);
>>
>>
>> I tried it the other way around as well and it still stops (meaning
>> producers are blocked or resourceAllocationExceptions are thrown from
>> Queue.send()) when it gets to the lower of the two memory limits. I am
>> definitely missing something.
>>
>> B.) Would using the StorePendingQueueMessageStoragePolicy provide the same
>> behavior I am looking for?
>>
>> C.) I didn't understand the last sentence in your post.  Does this mean
>> that
>> when the brokerService.getSystemUsage().getTempUsage() is the disk usage
>> limit that should generate ResourceAllocationExceptions (assuming
>> SendFailIfNoSpace is set to true)?  In my configureation, it would mean
>> when
>> the 128MB is used up by the temp cursor references on disk, then no more
>> resources are available?
>>
>>
>>
>>
>>
>>
>> Gary Tully wrote:
>> >
>> > First thing is you need to use the FilePendingMessageStoragePolicy() as
>> > that
>> > will off load message references to the file system when
>> > SystemUsage.MemoryUsage limit is reached.
>> >
>> > So 1) add the following to the broker policy entry
>> >         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
>> > FilePendingQueueMessageStoragePolicy();
>> >         policy.setPendingQueuePolicy(pendingQueuePolicy);
>> >
>> > With flow control on, you need to configure a lower SystemUsage as the
>> use
>> > of disk space by the file based cursors is determined by the shared
>> > SystemUsage.memoryLimit, which by default is the same value as the
>> memory
>> > limit for a destination. With a single destination, the flowcontroll
>> kicks
>> > in before the system usage so no spooling to disk occurs.
>> >
>> > 2) Configure a SystemUsage.MemoryLimit that is less than the default
>> > destination memory limit of 64M
>> >    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024
>> *
>> > 63);
>> >
>> > This should do it once you add a TempStore() limit to implement 5.
>> >
>> >
>> > On 15 February 2010 17:22, scot.hale <sc...@gmail.com> wrote:
>> >
>> >>
>> >> I am trying to setup a queue with the following requirements:
>> >>
>> >>
>> >> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
>> >> 1. ) VM Transport
>> >> 2. ) Persistent with KahaPersistenceAdaptor
>> >> 4. ) JVM Memory usage is capped at something like 64MB
>> >>        - When this limit is reached the producers should continue to
>> >> store
>> >> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
>> >> work,
>> >> since the former is the default that is the one I have been using.)
>> >> 5. ) File System usage is capped at something like 10GB
>> >>        - When this limit is reached the producers should start throwing
>> >> javax.jms.ResourceAllocationExceptions to the Producers
>> >>
>> >> Number 5 is the least important, as it will be difficult to fill up
>> disk
>> >> space in production. My current setup configures ActiveMQ
>> >> programatically.
>> >> I don't think this is introducing problems, let me know if there are
>> >> issues
>> >> with programatic configuration.
>> >>
>> >>
>> >> Default settings:
>> >>        If I do not configure the SystemUsage or the Flow control, then
>> >> 64MB
>> >> default memory usage is reached and the producers are halted even
>> though
>> >> the
>> >> queues are persistent and have much more space.  Should the default
>> >> StoreBasedCursor behave this way?
>> >>
>> >>
>> >> Turn off Flow Control:
>> >>        When I turn off Flow Control with default SystemUseage settings,
>> >> then the
>> >> JVM memory is not capped.  After about 5 million messages with no
>> >> consumers
>> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>> >>
>> >>
>> >> So what setting do I need to cap the memory and allow the messages to
>> be
>> >> stored to disk even when the cap is reached?
>> >>
>> >>
>> >>
>> >> This is how I programtically configure my BrokerService
>> >>
>> >>        System.setProperty("defaultBinSize", "16384");//Only way to set
>> >> HashIndex bin size for KahaPersistenceAdapter
>> >>        try {
>> >>            uri = new URI("vm://"+brokerName);
>> >>        } catch (URISyntaxException e) {
>> >>            throw new RuntimeException(e);
>> >>        }
>> >>        brokerService = new BrokerService();
>> >>        brokerService.setBrokerName(brokerName);
>> >>        brokerService.setUseJmx(true);
>> >>        brokerService.setUseLoggingForShutdownErrors(true);
>> >>
>> >>
>> >>        PolicyMap policyMap = new PolicyMap();
>> >>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>> >>        PolicyEntry policy = new PolicyEntry();
>> >>        policy.setProducerFlowControl(true);
>> >>        policy.setQueue(">");
>> >>        entries.add(policy);
>> >>        policyMap.setPolicyEntries(entries);
>> >>        brokerService.setDestinationPolicy(policyMap);
>> >>
>> >>        //PERSISTENCE
>> >>        brokerService.setPersistent(true);
>> >>        KahaPersistenceAdapter persistenceAdapter = new
>> >> KahaPersistenceAdapter();
>> >>        persistenceAdapter.setDirectory(new
>> >> File("/tmp/activemq-"+brokerName+"/kaha"));
>> >>        brokerService.setDataDirectoryFile(new
>> >> File("/tmp/activemq-"+brokerName+"/data"));
>> >>        brokerService.setTmpDataDirectory(new
>> >> File("/tmp/activemq-"+brokerName+"/temp"));
>> >>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>> >>
>> >>        try {
>> >>            brokerService.setPersistenceAdapter(persistenceAdapter);
>> >>        } catch (IOException e) {
>> >>            throw new RuntimeException(e);
>> >>        }
>> >>        try {
>> >>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>> >>            brokerService.addConnector(uri);
>> >>            brokerService.start();
>> >>        } catch (Exception e) {
>> >>            throw new RuntimeException(e);
>> >>        }
>> >>
>> >>
>> >>
>> >> Here is a Producer:
>> >>
>> >> public class Producer implements Runnable{
>> >>
>> >>    private BrokerService brokerService;
>> >>    private long numberOfMessages;
>> >>
>> >>    public Producer(BrokerService brokerService, long n){
>> >>        this.brokerService = brokerService;
>> >>        this.numberOfMessages = n;
>> >>    }
>> >>
>> >>    public void run(){
>> >>        ActiveMQConnectionFactory factory = new
>> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>> >>        try {
>> >>            Connection conn = factory.createConnection();
>> >>            conn.start();
>> >>            for (int i = 0; i < numberOfMessages; i++) {
>> >>                Session session = conn.createSession(false,
>> >> Session.AUTO_ACKNOWLEDGE);
>> >>                Destination destination =
>> >> session.createQueue("test-queue");
>> >>                MessageProducer producer =
>> >> session.createProducer(destination);
>> >>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>> >>                BytesMessage message = session.createBytesMessage();
>> >>                message.writeBytes(new
>> >>
>> >>
>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>> >>                try {
>> >>                    producer.send(message);
>> >>                } catch (ResourceAllocationException e) {
>> >>                    e.printStackTrace();
>> >>                }
>> >>                session.close();
>> >>            }
>> >>        } catch (JMSException e) {
>> >>            throw new RuntimeException(e);
>> >>         }
>> >>
>> >>    }
>> >> }
>> >>
>> >>
>> >> rajdavies wrote:
>> >> >
>> >> > Hi Scott,
>> >> >
>> >> > just change the below config to enable flow control - i.e:
>> >> >
>> >> > <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
>> >> >   <policyEntry queue=">" producerFlowControl="true"
>> memoryLimit="1mb">
>> >> >
>> >> > in 5.3 - producerFlowControl is on by default - so just remove the
>> >> > producerFlowControl entry from your configuration.
>> >> >
>> >> > If this all sounds double dutch - send in your config - and we'll
>> help
>> >> > with the correct settings :)
>> >> >
>> >> >
>> >> > On 12 Feb 2010, at 20:49, scot.hale wrote:
>> >> >
>> >> >>
>> >> >> Fred,
>> >> >>
>> >> >> Were you able to configure ActiveMQ to grow without surpassing the
>> >> >> memory
>> >> >> setting?  I am trying to figure out how to do the same thing.
>> >> >>
>> >> >> -Scot
>> >> >>
>> >> >>
>> >> >> Fred Moore-3 wrote:
>> >> >>>
>> >> >>> Hi,
>> >> >>>
>> >> >>> going back to Cursors and
>> >> >>>
>> >>
>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>> >> >>> ...
>> >> >>>
>> >> >>> ...can anyone shed some light on the actual role of memoryLimit in:
>> >> >>>   <policyEntry topic=">" producerFlowControl="false"
>> >> >>> memoryLimit="1mb">
>> >> >>>   <policyEntry queue=">" producerFlowControl="false"
>> >> >>> memoryLimit="1mb">
>> >> >>>
>> >> >>> ...moreover: *when* will producerFlowControl start slowing down
>> >> >>> consumers?
>> >> >>>
>> >> >>> Cheers,
>> >> >>> F.
>> >> >>>
>> >> >>>
>> >> >>
>> >> >> --
>> >> >> View this message in context:
>> >> >>
>> >>
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >> >>
>> >> >
>> >> > Rob Davies
>> >> > http://twitter.com/rajdavies
>> >> > I work here: http://fusesource.com
>> >> > My Blog: http://rajdavies.blogspot.com/
>> >> > I'm writing this: http://www.manning.com/snyder/
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>
>> >>
>> >
>> >
>> > --
>> > http://blog.garytully.com
>> >
>> > Open Source Integration
>> > http://fusesource.com
>> >
>> >
>>
>> --
>> View this message in context:
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
>
>
> --
> http://blog.garytully.com
>
> Open Source Integration
> http://fusesource.com
>



-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by Gary Tully <ga...@gmail.com>.
There is something not right here. let me build a test case to investigate a
bit.

On 16 February 2010 01:19, scot.hale <sc...@gmail.com> wrote:

>
> A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume that
> this needs to be added to the queue destination policy specifically.
> However I added it to default and Topic just to be sure (not shown here).
>
> I turned on flow control, but was unable to figure out what memory settings
> are needed.  What I gathered from your post is that I need to set the queue
> destination memory limit higher than the default SystemUsage memory limit.
> Is that right?  For example:
>
>
>
>
> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
>
> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);
>
>        PolicyMap policyMap = new PolicyMap();
>
>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>
>        PolicyEntry policy = new PolicyEntry();
>        policy.setProducerFlowControl(true);
>         policy.setPendingQueuePolicy(new
> FilePendingQueueMessageStoragePolicy());
>        policy.setQueue(">");
>        policy.setMemoryLimit(64*1024*1024);
>         entries.add(policy);
>        policyMap.setPolicyEntries(entries);
>
>        brokerService.setDestinationPolicy(policyMap);
>
>
> I tried it the other way around as well and it still stops (meaning
> producers are blocked or resourceAllocationExceptions are thrown from
> Queue.send()) when it gets to the lower of the two memory limits. I am
> definitely missing something.
>
> B.) Would using the StorePendingQueueMessageStoragePolicy provide the same
> behavior I am looking for?
>
> C.) I didn't understand the last sentence in your post.  Does this mean
> that
> when the brokerService.getSystemUsage().getTempUsage() is the disk usage
> limit that should generate ResourceAllocationExceptions (assuming
> SendFailIfNoSpace is set to true)?  In my configureation, it would mean
> when
> the 128MB is used up by the temp cursor references on disk, then no more
> resources are available?
>
>
>
>
>
>
> Gary Tully wrote:
> >
> > First thing is you need to use the FilePendingMessageStoragePolicy() as
> > that
> > will off load message references to the file system when
> > SystemUsage.MemoryUsage limit is reached.
> >
> > So 1) add the following to the broker policy entry
> >         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
> > FilePendingQueueMessageStoragePolicy();
> >         policy.setPendingQueuePolicy(pendingQueuePolicy);
> >
> > With flow control on, you need to configure a lower SystemUsage as the
> use
> > of disk space by the file based cursors is determined by the shared
> > SystemUsage.memoryLimit, which by default is the same value as the memory
> > limit for a destination. With a single destination, the flowcontroll
> kicks
> > in before the system usage so no spooling to disk occurs.
> >
> > 2) Configure a SystemUsage.MemoryLimit that is less than the default
> > destination memory limit of 64M
> >    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 *
> > 63);
> >
> > This should do it once you add a TempStore() limit to implement 5.
> >
> >
> > On 15 February 2010 17:22, scot.hale <sc...@gmail.com> wrote:
> >
> >>
> >> I am trying to setup a queue with the following requirements:
> >>
> >>
> >> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
> >> 1. ) VM Transport
> >> 2. ) Persistent with KahaPersistenceAdaptor
> >> 4. ) JVM Memory usage is capped at something like 64MB
> >>        - When this limit is reached the producers should continue to
> >> store
> >> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
> >> work,
> >> since the former is the default that is the one I have been using.)
> >> 5. ) File System usage is capped at something like 10GB
> >>        - When this limit is reached the producers should start throwing
> >> javax.jms.ResourceAllocationExceptions to the Producers
> >>
> >> Number 5 is the least important, as it will be difficult to fill up disk
> >> space in production. My current setup configures ActiveMQ
> >> programatically.
> >> I don't think this is introducing problems, let me know if there are
> >> issues
> >> with programatic configuration.
> >>
> >>
> >> Default settings:
> >>        If I do not configure the SystemUsage or the Flow control, then
> >> 64MB
> >> default memory usage is reached and the producers are halted even though
> >> the
> >> queues are persistent and have much more space.  Should the default
> >> StoreBasedCursor behave this way?
> >>
> >>
> >> Turn off Flow Control:
> >>        When I turn off Flow Control with default SystemUseage settings,
> >> then the
> >> JVM memory is not capped.  After about 5 million messages with no
> >> consumers
> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
> >>
> >>
> >> So what setting do I need to cap the memory and allow the messages to be
> >> stored to disk even when the cap is reached?
> >>
> >>
> >>
> >> This is how I programtically configure my BrokerService
> >>
> >>        System.setProperty("defaultBinSize", "16384");//Only way to set
> >> HashIndex bin size for KahaPersistenceAdapter
> >>        try {
> >>            uri = new URI("vm://"+brokerName);
> >>        } catch (URISyntaxException e) {
> >>            throw new RuntimeException(e);
> >>        }
> >>        brokerService = new BrokerService();
> >>        brokerService.setBrokerName(brokerName);
> >>        brokerService.setUseJmx(true);
> >>        brokerService.setUseLoggingForShutdownErrors(true);
> >>
> >>
> >>        PolicyMap policyMap = new PolicyMap();
> >>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
> >>        PolicyEntry policy = new PolicyEntry();
> >>        policy.setProducerFlowControl(true);
> >>        policy.setQueue(">");
> >>        entries.add(policy);
> >>        policyMap.setPolicyEntries(entries);
> >>        brokerService.setDestinationPolicy(policyMap);
> >>
> >>        //PERSISTENCE
> >>        brokerService.setPersistent(true);
> >>        KahaPersistenceAdapter persistenceAdapter = new
> >> KahaPersistenceAdapter();
> >>        persistenceAdapter.setDirectory(new
> >> File("/tmp/activemq-"+brokerName+"/kaha"));
> >>        brokerService.setDataDirectoryFile(new
> >> File("/tmp/activemq-"+brokerName+"/data"));
> >>        brokerService.setTmpDataDirectory(new
> >> File("/tmp/activemq-"+brokerName+"/temp"));
> >>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
> >>
> >>        try {
> >>            brokerService.setPersistenceAdapter(persistenceAdapter);
> >>        } catch (IOException e) {
> >>            throw new RuntimeException(e);
> >>        }
> >>        try {
> >>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
> >>            brokerService.addConnector(uri);
> >>            brokerService.start();
> >>        } catch (Exception e) {
> >>            throw new RuntimeException(e);
> >>        }
> >>
> >>
> >>
> >> Here is a Producer:
> >>
> >> public class Producer implements Runnable{
> >>
> >>    private BrokerService brokerService;
> >>    private long numberOfMessages;
> >>
> >>    public Producer(BrokerService brokerService, long n){
> >>        this.brokerService = brokerService;
> >>        this.numberOfMessages = n;
> >>    }
> >>
> >>    public void run(){
> >>        ActiveMQConnectionFactory factory = new
> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
> >>        try {
> >>            Connection conn = factory.createConnection();
> >>            conn.start();
> >>            for (int i = 0; i < numberOfMessages; i++) {
> >>                Session session = conn.createSession(false,
> >> Session.AUTO_ACKNOWLEDGE);
> >>                Destination destination =
> >> session.createQueue("test-queue");
> >>                MessageProducer producer =
> >> session.createProducer(destination);
> >>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
> >>                BytesMessage message = session.createBytesMessage();
> >>                message.writeBytes(new
> >>
> >>
> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
> >>                try {
> >>                    producer.send(message);
> >>                } catch (ResourceAllocationException e) {
> >>                    e.printStackTrace();
> >>                }
> >>                session.close();
> >>            }
> >>        } catch (JMSException e) {
> >>            throw new RuntimeException(e);
> >>         }
> >>
> >>    }
> >> }
> >>
> >>
> >> rajdavies wrote:
> >> >
> >> > Hi Scott,
> >> >
> >> > just change the below config to enable flow control - i.e:
> >> >
> >> > <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
> >> >   <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
> >> >
> >> > in 5.3 - producerFlowControl is on by default - so just remove the
> >> > producerFlowControl entry from your configuration.
> >> >
> >> > If this all sounds double dutch - send in your config - and we'll help
> >> > with the correct settings :)
> >> >
> >> >
> >> > On 12 Feb 2010, at 20:49, scot.hale wrote:
> >> >
> >> >>
> >> >> Fred,
> >> >>
> >> >> Were you able to configure ActiveMQ to grow without surpassing the
> >> >> memory
> >> >> setting?  I am trying to figure out how to do the same thing.
> >> >>
> >> >> -Scot
> >> >>
> >> >>
> >> >> Fred Moore-3 wrote:
> >> >>>
> >> >>> Hi,
> >> >>>
> >> >>> going back to Cursors and
> >> >>>
> >>
> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
> >> >>> ...
> >> >>>
> >> >>> ...can anyone shed some light on the actual role of memoryLimit in:
> >> >>>   <policyEntry topic=">" producerFlowControl="false"
> >> >>> memoryLimit="1mb">
> >> >>>   <policyEntry queue=">" producerFlowControl="false"
> >> >>> memoryLimit="1mb">
> >> >>>
> >> >>> ...moreover: *when* will producerFlowControl start slowing down
> >> >>> consumers?
> >> >>>
> >> >>> Cheers,
> >> >>> F.
> >> >>>
> >> >>>
> >> >>
> >> >> --
> >> >> View this message in context:
> >> >>
> >>
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >> >>
> >> >
> >> > Rob Davies
> >> > http://twitter.com/rajdavies
> >> > I work here: http://fusesource.com
> >> > My Blog: http://rajdavies.blogspot.com/
> >> > I'm writing this: http://www.manning.com/snyder/
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >> --
> >> View this message in context:
> >>
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>
> >>
> >
> >
> > --
> > http://blog.garytully.com
> >
> > Open Source Integration
> > http://fusesource.com
> >
> >
>
> --
> View this message in context:
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by "scot.hale" <sc...@gmail.com>.
A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume that
this needs to be added to the queue destination policy specifically. 
However I added it to default and Topic just to be sure (not shown here).

I turned on flow control, but was unable to figure out what memory settings
are needed.  What I gathered from your post is that I need to set the queue
destination memory limit higher than the default SystemUsage memory limit. 
Is that right?  For example:



       
brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
       
brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);

        PolicyMap policyMap = new PolicyMap();
        
        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();

        PolicyEntry policy = new PolicyEntry();
        policy.setProducerFlowControl(true);
        policy.setPendingQueuePolicy(new
FilePendingQueueMessageStoragePolicy());
        policy.setQueue(">");
        policy.setMemoryLimit(64*1024*1024);
        entries.add(policy);        
        policyMap.setPolicyEntries(entries);
        
        brokerService.setDestinationPolicy(policyMap);


I tried it the other way around as well and it still stops (meaning
producers are blocked or resourceAllocationExceptions are thrown from
Queue.send()) when it gets to the lower of the two memory limits. I am
definitely missing something.

B.) Would using the StorePendingQueueMessageStoragePolicy provide the same
behavior I am looking for?  

C.) I didn't understand the last sentence in your post.  Does this mean that
when the brokerService.getSystemUsage().getTempUsage() is the disk usage
limit that should generate ResourceAllocationExceptions (assuming
SendFailIfNoSpace is set to true)?  In my configureation, it would mean when
the 128MB is used up by the temp cursor references on disk, then no more
resources are available?






Gary Tully wrote:
> 
> First thing is you need to use the FilePendingMessageStoragePolicy() as
> that
> will off load message references to the file system when
> SystemUsage.MemoryUsage limit is reached.
> 
> So 1) add the following to the broker policy entry
>         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
> FilePendingQueueMessageStoragePolicy();
>         policy.setPendingQueuePolicy(pendingQueuePolicy);
> 
> With flow control on, you need to configure a lower SystemUsage as the use
> of disk space by the file based cursors is determined by the shared
> SystemUsage.memoryLimit, which by default is the same value as the memory
> limit for a destination. With a single destination, the flowcontroll kicks
> in before the system usage so no spooling to disk occurs.
> 
> 2) Configure a SystemUsage.MemoryLimit that is less than the default
> destination memory limit of 64M
>    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 *
> 63);
> 
> This should do it once you add a TempStore() limit to implement 5.
> 
> 
> On 15 February 2010 17:22, scot.hale <sc...@gmail.com> wrote:
> 
>>
>> I am trying to setup a queue with the following requirements:
>>
>>
>> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
>> 1. ) VM Transport
>> 2. ) Persistent with KahaPersistenceAdaptor
>> 4. ) JVM Memory usage is capped at something like 64MB
>>        - When this limit is reached the producers should continue to
>> store
>> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
>> work,
>> since the former is the default that is the one I have been using.)
>> 5. ) File System usage is capped at something like 10GB
>>        - When this limit is reached the producers should start throwing
>> javax.jms.ResourceAllocationExceptions to the Producers
>>
>> Number 5 is the least important, as it will be difficult to fill up disk
>> space in production. My current setup configures ActiveMQ
>> programatically.
>> I don't think this is introducing problems, let me know if there are
>> issues
>> with programatic configuration.
>>
>>
>> Default settings:
>>        If I do not configure the SystemUsage or the Flow control, then
>> 64MB
>> default memory usage is reached and the producers are halted even though
>> the
>> queues are persistent and have much more space.  Should the default
>> StoreBasedCursor behave this way?
>>
>>
>> Turn off Flow Control:
>>        When I turn off Flow Control with default SystemUseage settings,
>> then the
>> JVM memory is not capped.  After about 5 million messages with no
>> consumers
>> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>>
>>
>> So what setting do I need to cap the memory and allow the messages to be
>> stored to disk even when the cap is reached?
>>
>>
>>
>> This is how I programtically configure my BrokerService
>>
>>        System.setProperty("defaultBinSize", "16384");//Only way to set
>> HashIndex bin size for KahaPersistenceAdapter
>>        try {
>>            uri = new URI("vm://"+brokerName);
>>        } catch (URISyntaxException e) {
>>            throw new RuntimeException(e);
>>        }
>>        brokerService = new BrokerService();
>>        brokerService.setBrokerName(brokerName);
>>        brokerService.setUseJmx(true);
>>        brokerService.setUseLoggingForShutdownErrors(true);
>>
>>
>>        PolicyMap policyMap = new PolicyMap();
>>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>        PolicyEntry policy = new PolicyEntry();
>>        policy.setProducerFlowControl(true);
>>        policy.setQueue(">");
>>        entries.add(policy);
>>        policyMap.setPolicyEntries(entries);
>>        brokerService.setDestinationPolicy(policyMap);
>>
>>        //PERSISTENCE
>>        brokerService.setPersistent(true);
>>        KahaPersistenceAdapter persistenceAdapter = new
>> KahaPersistenceAdapter();
>>        persistenceAdapter.setDirectory(new
>> File("/tmp/activemq-"+brokerName+"/kaha"));
>>        brokerService.setDataDirectoryFile(new
>> File("/tmp/activemq-"+brokerName+"/data"));
>>        brokerService.setTmpDataDirectory(new
>> File("/tmp/activemq-"+brokerName+"/temp"));
>>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>>
>>        try {
>>            brokerService.setPersistenceAdapter(persistenceAdapter);
>>        } catch (IOException e) {
>>            throw new RuntimeException(e);
>>        }
>>        try {
>>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>>            brokerService.addConnector(uri);
>>            brokerService.start();
>>        } catch (Exception e) {
>>            throw new RuntimeException(e);
>>        }
>>
>>
>>
>> Here is a Producer:
>>
>> public class Producer implements Runnable{
>>
>>    private BrokerService brokerService;
>>    private long numberOfMessages;
>>
>>    public Producer(BrokerService brokerService, long n){
>>        this.brokerService = brokerService;
>>        this.numberOfMessages = n;
>>    }
>>
>>    public void run(){
>>        ActiveMQConnectionFactory factory = new
>> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>>        try {
>>            Connection conn = factory.createConnection();
>>            conn.start();
>>            for (int i = 0; i < numberOfMessages; i++) {
>>                Session session = conn.createSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>>                Destination destination =
>> session.createQueue("test-queue");
>>                MessageProducer producer =
>> session.createProducer(destination);
>>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>                BytesMessage message = session.createBytesMessage();
>>                message.writeBytes(new
>>
>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>>                try {
>>                    producer.send(message);
>>                } catch (ResourceAllocationException e) {
>>                    e.printStackTrace();
>>                }
>>                session.close();
>>            }
>>        } catch (JMSException e) {
>>            throw new RuntimeException(e);
>>         }
>>
>>    }
>> }
>>
>>
>> rajdavies wrote:
>> >
>> > Hi Scott,
>> >
>> > just change the below config to enable flow control - i.e:
>> >
>> > <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
>> >   <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
>> >
>> > in 5.3 - producerFlowControl is on by default - so just remove the
>> > producerFlowControl entry from your configuration.
>> >
>> > If this all sounds double dutch - send in your config - and we'll help
>> > with the correct settings :)
>> >
>> >
>> > On 12 Feb 2010, at 20:49, scot.hale wrote:
>> >
>> >>
>> >> Fred,
>> >>
>> >> Were you able to configure ActiveMQ to grow without surpassing the
>> >> memory
>> >> setting?  I am trying to figure out how to do the same thing.
>> >>
>> >> -Scot
>> >>
>> >>
>> >> Fred Moore-3 wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> going back to Cursors and
>> >>>
>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>> >>> ...
>> >>>
>> >>> ...can anyone shed some light on the actual role of memoryLimit in:
>> >>>   <policyEntry topic=">" producerFlowControl="false"
>> >>> memoryLimit="1mb">
>> >>>   <policyEntry queue=">" producerFlowControl="false"
>> >>> memoryLimit="1mb">
>> >>>
>> >>> ...moreover: *when* will producerFlowControl start slowing down
>> >>> consumers?
>> >>>
>> >>> Cheers,
>> >>> F.
>> >>>
>> >>>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>
>> >
>> > Rob Davies
>> > http://twitter.com/rajdavies
>> > I work here: http://fusesource.com
>> > My Blog: http://rajdavies.blogspot.com/
>> > I'm writing this: http://www.manning.com/snyder/
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>> --
>> View this message in context:
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> -- 
> http://blog.garytully.com
> 
> Open Source Integration
> http://fusesource.com
> 
> 

-- 
View this message in context: http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by Gary Tully <ga...@gmail.com>.
First thing is you need to use the FilePendingMessageStoragePolicy() as that
will off load message references to the file system when
SystemUsage.MemoryUsage limit is reached.

So 1) add the following to the broker policy entry
        PendingQueueMessageStoragePolicy pendingQueuePolicy = new
FilePendingQueueMessageStoragePolicy();
        policy.setPendingQueuePolicy(pendingQueuePolicy);

With flow control on, you need to configure a lower SystemUsage as the use
of disk space by the file based cursors is determined by the shared
SystemUsage.memoryLimit, which by default is the same value as the memory
limit for a destination. With a single destination, the flowcontroll kicks
in before the system usage so no spooling to disk occurs.

2) Configure a SystemUsage.MemoryLimit that is less than the default
destination memory limit of 64M
   brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 *
63);

This should do it once you add a TempStore() limit to implement 5.


On 15 February 2010 17:22, scot.hale <sc...@gmail.com> wrote:

>
> I am trying to setup a queue with the following requirements:
>
>
> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
> 1. ) VM Transport
> 2. ) Persistent with KahaPersistenceAdaptor
> 4. ) JVM Memory usage is capped at something like 64MB
>        - When this limit is reached the producers should continue to store
> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will work,
> since the former is the default that is the one I have been using.)
> 5. ) File System usage is capped at something like 10GB
>        - When this limit is reached the producers should start throwing
> javax.jms.ResourceAllocationExceptions to the Producers
>
> Number 5 is the least important, as it will be difficult to fill up disk
> space in production. My current setup configures ActiveMQ programatically.
> I don't think this is introducing problems, let me know if there are issues
> with programatic configuration.
>
>
> Default settings:
>        If I do not configure the SystemUsage or the Flow control, then 64MB
> default memory usage is reached and the producers are halted even though
> the
> queues are persistent and have much more space.  Should the default
> StoreBasedCursor behave this way?
>
>
> Turn off Flow Control:
>        When I turn off Flow Control with default SystemUseage settings,
> then the
> JVM memory is not capped.  After about 5 million messages with no consumers
> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>
>
> So what setting do I need to cap the memory and allow the messages to be
> stored to disk even when the cap is reached?
>
>
>
> This is how I programtically configure my BrokerService
>
>        System.setProperty("defaultBinSize", "16384");//Only way to set
> HashIndex bin size for KahaPersistenceAdapter
>        try {
>            uri = new URI("vm://"+brokerName);
>        } catch (URISyntaxException e) {
>            throw new RuntimeException(e);
>        }
>        brokerService = new BrokerService();
>        brokerService.setBrokerName(brokerName);
>        brokerService.setUseJmx(true);
>        brokerService.setUseLoggingForShutdownErrors(true);
>
>
>        PolicyMap policyMap = new PolicyMap();
>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>        PolicyEntry policy = new PolicyEntry();
>        policy.setProducerFlowControl(true);
>        policy.setQueue(">");
>        entries.add(policy);
>        policyMap.setPolicyEntries(entries);
>        brokerService.setDestinationPolicy(policyMap);
>
>        //PERSISTENCE
>        brokerService.setPersistent(true);
>        KahaPersistenceAdapter persistenceAdapter = new
> KahaPersistenceAdapter();
>        persistenceAdapter.setDirectory(new
> File("/tmp/activemq-"+brokerName+"/kaha"));
>        brokerService.setDataDirectoryFile(new
> File("/tmp/activemq-"+brokerName+"/data"));
>        brokerService.setTmpDataDirectory(new
> File("/tmp/activemq-"+brokerName+"/temp"));
>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>
>        try {
>            brokerService.setPersistenceAdapter(persistenceAdapter);
>        } catch (IOException e) {
>            throw new RuntimeException(e);
>        }
>        try {
>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>            brokerService.addConnector(uri);
>            brokerService.start();
>        } catch (Exception e) {
>            throw new RuntimeException(e);
>        }
>
>
>
> Here is a Producer:
>
> public class Producer implements Runnable{
>
>    private BrokerService brokerService;
>    private long numberOfMessages;
>
>    public Producer(BrokerService brokerService, long n){
>        this.brokerService = brokerService;
>        this.numberOfMessages = n;
>    }
>
>    public void run(){
>        ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>        try {
>            Connection conn = factory.createConnection();
>            conn.start();
>            for (int i = 0; i < numberOfMessages; i++) {
>                Session session = conn.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                Destination destination = session.createQueue("test-queue");
>                MessageProducer producer =
> session.createProducer(destination);
>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>                BytesMessage message = session.createBytesMessage();
>                message.writeBytes(new
>
> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>                try {
>                    producer.send(message);
>                } catch (ResourceAllocationException e) {
>                    e.printStackTrace();
>                }
>                session.close();
>            }
>        } catch (JMSException e) {
>            throw new RuntimeException(e);
>         }
>
>    }
> }
>
>
> rajdavies wrote:
> >
> > Hi Scott,
> >
> > just change the below config to enable flow control - i.e:
> >
> > <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
> >   <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
> >
> > in 5.3 - producerFlowControl is on by default - so just remove the
> > producerFlowControl entry from your configuration.
> >
> > If this all sounds double dutch - send in your config - and we'll help
> > with the correct settings :)
> >
> >
> > On 12 Feb 2010, at 20:49, scot.hale wrote:
> >
> >>
> >> Fred,
> >>
> >> Were you able to configure ActiveMQ to grow without surpassing the
> >> memory
> >> setting?  I am trying to figure out how to do the same thing.
> >>
> >> -Scot
> >>
> >>
> >> Fred Moore-3 wrote:
> >>>
> >>> Hi,
> >>>
> >>> going back to Cursors and
> >>>
> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
> >>> ...
> >>>
> >>> ...can anyone shed some light on the actual role of memoryLimit in:
> >>>   <policyEntry topic=">" producerFlowControl="false"
> >>> memoryLimit="1mb">
> >>>   <policyEntry queue=">" producerFlowControl="false"
> >>> memoryLimit="1mb">
> >>>
> >>> ...moreover: *when* will producerFlowControl start slowing down
> >>> consumers?
> >>>
> >>> Cheers,
> >>> F.
> >>>
> >>>
> >>
> >> --
> >> View this message in context:
> >>
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>
> >
> > Rob Davies
> > http://twitter.com/rajdavies
> > I work here: http://fusesource.com
> > My Blog: http://rajdavies.blogspot.com/
> > I'm writing this: http://www.manning.com/snyder/
> >
> >
> >
> >
> >
> >
> >
>
> --
> View this message in context:
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?

Posted by "scot.hale" <sc...@gmail.com>.
I am trying to setup a queue with the following requirements:


ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
1. ) VM Transport
2. ) Persistent with KahaPersistenceAdaptor 
4. ) JVM Memory usage is capped at something like 64MB
	- When this limit is reached the producers should continue to store
incoming messages to disk  (StoreBasedCursor or FileBasedCursor will work,
since the former is the default that is the one I have been using.)
5. ) File System usage is capped at something like 10GB
	- When this limit is reached the producers should start throwing
javax.jms.ResourceAllocationExceptions to the Producers

Number 5 is the least important, as it will be difficult to fill up disk
space in production. My current setup configures ActiveMQ programatically.  
I don't think this is introducing problems, let me know if there are issues
with programatic configuration.


Default settings:  
	If I do not configure the SystemUsage or the Flow control, then 64MB
default memory usage is reached and the producers are halted even though the
queues are persistent and have much more space.  Should the default
StoreBasedCursor behave this way?


Turn off Flow Control:  
	When I turn off Flow Control with default SystemUseage settings, then the
JVM memory is not capped.  After about 5 million messages with no consumers
the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.


So what setting do I need to cap the memory and allow the messages to be
stored to disk even when the cap is reached?  



This is how I programtically configure my BrokerService

        System.setProperty("defaultBinSize", "16384");//Only way to set
HashIndex bin size for KahaPersistenceAdapter
        try {
            uri = new URI("vm://"+brokerName);
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
        brokerService = new BrokerService();
        brokerService.setBrokerName(brokerName);
        brokerService.setUseJmx(true);
        brokerService.setUseLoggingForShutdownErrors(true);
        
        
        PolicyMap policyMap = new PolicyMap();        
        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry policy = new PolicyEntry();
        policy.setProducerFlowControl(true);
        policy.setQueue(">");
        entries.add(policy);                
        policyMap.setPolicyEntries(entries);
        brokerService.setDestinationPolicy(policyMap);
        
        //PERSISTENCE
        brokerService.setPersistent(true);
        KahaPersistenceAdapter persistenceAdapter = new
KahaPersistenceAdapter();
        persistenceAdapter.setDirectory(new
File("/tmp/activemq-"+brokerName+"/kaha"));
        brokerService.setDataDirectoryFile(new
File("/tmp/activemq-"+brokerName+"/data"));
        brokerService.setTmpDataDirectory(new
File("/tmp/activemq-"+brokerName+"/temp"));
        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
        
        try {
            brokerService.setPersistenceAdapter(persistenceAdapter);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        try {
            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
            brokerService.addConnector(uri);
            brokerService.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }



Here is a Producer:

public class Producer implements Runnable{
    
    private BrokerService brokerService;   
    private long numberOfMessages;
    
    public Producer(BrokerService brokerService, long n){
        this.brokerService = brokerService;
        this.numberOfMessages = n;
    }
    
    public void run(){
        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerService.getVmConnectorURI());        
        try {
            Connection conn = factory.createConnection();            
            conn.start();
            for (int i = 0; i < numberOfMessages; i++) {
                Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);                
                Destination destination = session.createQueue("test-queue");
                MessageProducer producer =
session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(new
byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
                try {
                    producer.send(message);
                } catch (ResourceAllocationException e) {
                    e.printStackTrace();
                }
                session.close();
            }
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        
    }
}


rajdavies wrote:
> 
> Hi Scott,
> 
> just change the below config to enable flow control - i.e:
> 
> <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
>   <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
> 
> in 5.3 - producerFlowControl is on by default - so just remove the  
> producerFlowControl entry from your configuration.
> 
> If this all sounds double dutch - send in your config - and we'll help  
> with the correct settings :)
> 
> 
> On 12 Feb 2010, at 20:49, scot.hale wrote:
> 
>>
>> Fred,
>>
>> Were you able to configure ActiveMQ to grow without surpassing the  
>> memory
>> setting?  I am trying to figure out how to do the same thing.
>>
>> -Scot
>>
>>
>> Fred Moore-3 wrote:
>>>
>>> Hi,
>>>
>>> going back to Cursors and
>>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>>> ...
>>>
>>> ...can anyone shed some light on the actual role of memoryLimit in:
>>>   <policyEntry topic=">" producerFlowControl="false"  
>>> memoryLimit="1mb">
>>>   <policyEntry queue=">" producerFlowControl="false"  
>>> memoryLimit="1mb">
>>>
>>> ...moreover: *when* will producerFlowControl start slowing down  
>>> consumers?
>>>
>>> Cheers,
>>> F.
>>>
>>>
>>
>> -- 
>> View this message in context:
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
> 
> Rob Davies
> http://twitter.com/rajdavies
> I work here: http://fusesource.com
> My Blog: http://rajdavies.blogspot.com/
> I'm writing this: http://www.manning.com/snyder/
> 
> 
> 
> 
> 
> 
> 

-- 
View this message in context: http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.