You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Klaus Pittig <kl...@futura4retail.com> on 2016/01/08 16:27:27 UTC

How to avoid blocking of queue browsing after ActiveMQ checkpoint call

(related issue: https://issues.apache.org/jira/browse/AMQ-6115)

There's a problem when Using ActiveMQ with a large number of Persistence
Queues (250) á 1000 persistent TextMessages á 10 KB.

Our scenario requires these messages to remain in the storage over a
long time (days), until they are consumed (large amounts of data are
staged for distribution for many consumer, that could be offline for
some days).


After the Persistence Store is filled with these Messages and after a
broker restart we can browse/consume some Queues  _until_ the
#checkpoint call after 30 seconds.

This call causes the broker to use all available memory and never
releases it for other tasks such as Queue browse/consume. Internally the
MessageCursor seems to decide, that there is not enough memory and stops
delivery of queue content to browsers/consumers.


=> Is there a way to avoid this behaviour by configuration or is this a bug?

The expectation is, that we can consume/browse any queue under all
circumstances.

Settings below are in production for some time now and several
recommendations are applied found in the ActiveMQ documentation
(destination policies, systemUsage, persistence store options etc.)

 - Behaviour is tested with ActiveMQ: 5.11.2, 5.13.0 and 5.5.1.
 - Memory Settings: Xmx=1024m
 - Java: 1.8 or 1.7
 - OS: Windows, MacOS, Linux
 - PersistenceAdapter: KahaDB or LevelDB
 - Disc: enough free space (200 GB) and physical memory (16 GB max).

Besides the above mentioned settings we use the following settings for
the broker (btw: changing the memoryLimit to a lower value like 1mb does
not change the situation):

    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry queue=">" producerFlowControl="false"
optimizedDispatch="true" memoryLimit="128mb"
timeBeforeDispatchStarts="1000">
                    <dispatchPolicy>
                        <strictOrderDispatchPolicy />
                    </dispatchPolicy>
                    <pendingQueuePolicy>
                        <storeCursor />
                    </pendingQueuePolicy>
                </policyEntry>
            </policyEntries>
        </policyMap>
    </destinationPolicy>
    <systemUsage>
        <systemUsage sendFailIfNoSpace="true">
            <memoryUsage>
                <memoryUsage limit="50 mb" />
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="80000 mb" />
            </storeUsage>
            <tempUsage>
                <tempUsage limit="1000 mb" />
            </tempUsage>
        </systemUsage>
    </systemUsage>

If we set the **cursorMemoryHighWaterMark** in the destinationPolicy to
a higher value like **150** or **600** depending on the difference
between memoryUsage and the available heap space relieves the situation
a bit for a workaround, but this is not really an option for production
systems in my point of view.

Screenie with information from Oracle Mission Control showing those
ActiveMQTextMessage instances that are never released from memory:

http://goo.gl/EjEixV


Cheers
Klaus

Re: How to avoid blocking of queue browsing after ActiveMQ checkpoint call

Posted by Tim Bain <tb...@alumni.duke.edu>.
First, I think you're right that it's problematic that KahaDB checkpoint
operations and "real" usage of the memory store are able to block one
another, though my reasons for saying that may not be quite the same as
yours.  Fundamentally, the memory store and the persistence store are
intended to be two independent storage areas, used for different purposes
(one stores persistent messages, one stores non-persistent messages).  One
should be able to fill (and invoke flow control on the destinations that
use it) without affecting the other.  Yet what's actually happened is that
we needed the ability to read messages from the persistent store into
memory, and instead of recognizing the abstraction that the memory store is
for non-persistent memory (and therefore not for the temporarily-in-memory
copies of persistent messages as they are read into cursors), we treated
the memory store as a catch-all for any copy of a message that wasn't in a
persistent store.  Obviously they have to go somewhere, but there's already
a catch-all place for storing stuff that's being used for current
processing, and it's called the heap, and we should have used it for copies
of messages read in by cursors (or any other mechanism for reading messages
from the persistent store) instead of placing them into the memory store.

One argument for putting the messages into the memory store might have been
to ensure that lots of cursor-intensive operations don't cause the broker
to OOM.  I don't agree with that argument.  Overloading a piece of software
beyond the memory capacity that you've chosen to configure it with will
always cause the software to OOM; if you connect to the broker with a
billion separate connections, it's going to OOM, so I don't see why trying
to do too much simultaneous browsing of the persistent store should be any
different.  The argument will always be, "if the way you're using the
broker causes it to run out of memory, then either give it more memory so
you don't run out, or stop doing whatever you're doing that's causing it to
use so much memory;" there's no need to use the memory store as an attempt
to protect us from that problem.

So I roughly agree with the general premise that KahaDB operations should
be unaffected by a full memory store and should not cause the memory store
to fill, though my proposal for how to solve the problem differs from
yours, and I don't think that AMQ-6115 really captures what I'm proposing.
I'll submit a separate JIRA to propose that when I get some time this
weekend.

But putting that aside, it's obvious from your description that how you've
currently configured the broker is insufficient for the usage patterns
you're looking to support.  (If it was sufficient, the checkpoint would
succeed and everything would be find, as you've proven by your experiment
of increasing the memory store size and the heap size.)  So the same
argument I described halfway through the paragraph about OOMs applies to
you here: you're trying to do too much for the memory resources you've
allocated to your broker, so you either need to increase your resource
allocation or decrease your workload.  (Or we need to decrease the amount
of memory used by the checkpointing algorithm; more on that in a minute.)

Increasing your resource allocation is simple, but you say it's not
possible; I'm very skeptical that it's truly not possible, especially when
you say the host has 16GB total but you're only allowed 1GB, and would
recommend you question your assumptions (and those of your bosses, systems
engineers, customers, etc.) to see if it's actually not possible or just
not easy, but ultimately you know your scenario and if it's not possible
then it's not possible.

Decreasing your workload in this case means reducing the amount of work
required to checkpoint your KahaDB instance.  I don't have a lot of
expertise on the KahaDB checkpoint algorithm, but I'd assume that you'll
use less memory store if you have fewer messages in your KahaDB instance,
and that you might also use less if you have the same number of messages in
fewer destinations (since I assume that the cursors only have one page
worth of messages loaded at a time, so fewer cursors would mean fewer pages
loaded and therefore less memory usage at any one time).  So one option
might be to rearchitect your use of destinations to push messages into
fewer destinations and use selectors to ensure that the right consumers get
the right ones.  Another solution that could work would be to keep messages
for less time, either by discarding them if they're unconsumed after a
certain amount of time, by changing your consumers to reduce the max
possible time before they consume, or by consuming them yourself and
storing them into a database, to be retrieved later.  ActiveMQ is not a
database and is not optimized to act as one; it's optimized for delivery of
messages shortly after the are produced, and by keeping large amounts of
data available for days at a time, you're using it in ways it's not
optimized for (as you've discovered), but other products are optimized to
be used as a database and you could consume content yourself and write it
into one of those products if it's unconsumed after a certain amount of
time.  A third solution would be to provide dedicated brokers for each
consumer with a hub-and-spoke topology, so that there are fewer messages in
any given broker (so you've got a better chance of not running out of
memory store space while checkpointing) and one consumer going offline
doesn't affect the other brokers since the other brokers won't see the
offline consumer's messages.  A fourth option could be to shard your
destinations across multiple central brokers (standalone, not part of a
network of brokers), where each one is responsible for only certain
destinations and consumers connect to all brokers and know which
destinations are found on which broker.  There might be other ways to do
this, but those are the four I can think of: reduce your number of
destinations, reduce your message volume by not keeping them on the broker
as long, reduce your messages per broker by using a network of brokers, or
reduce your messages per broker by sharding across multiple standalone
brokers.

I promised to come back to whether the ActiveMQ code can be changed to
reduce the memory footprint used by checkpointing, so here it is.  I
haven't looked at the code that does the checkpointing, but I would assume
that it would be possible to configure it in ways that don't use as much of
the memory store, possibly at the cost of having checkpoints take longer.
One possibility might be to reduce the page size of the cursors that are
used for checkpointing each destination, so that each one puts fewer
messages at a time into the memory store.  Another option might be to limit
the number of destinations that can be checkpointed in parallel, so there
are at most N * Page Size messages in the memory store at a time due to
checkpointing.  Maybe there are other options as well, but those are the
two that jumped to mind without having read the code.  Both changes might
increase the time required to perform a checkpoint, so they'd need to be
configuration options that were exposed rather than changing default
behavior.  I don't want to submit a JIRA enhancement request without having
read the code to confirm that it works the way I've assumed it does, so I'm
not going to submit a JIRA entry about these ideas at the moment.  If you
have time to find and read the source code and evaluate whether these
proposed solutions make sense (and whether there's a better option), then
please do, otherwise I'll try to get to it when I have some time.  Either
way, this won't give you an immediate fix and there's no guarantee that any
enhancement you request will be implemented, so in the meantime I'd
encourage you to consider some of the other things I've recommended to you,
so you have options between now and whenever a version containing one of
these enhancements is released.

Tim

On Wed, Jan 13, 2016 at 8:43 AM, Klaus Pittig <
klaus.pittig@futura4retail.com> wrote:

> a.) Regarding your last answer (thanks for your effort by the way):
>
> I'm aware of the relation between the heap and the systemUsage
> memoryLimit and we make sure that there are no illogical settings.
> The primary requirement is to have a stable system running 'forever'
> w/o any memory issues at any time independent from the load/throughput.
> No one really wants to deal with memory settings on the edge of limits.
>
> You're right: the memory is completely consumed. And I can't guarantee
> the checkpoint/cleanup to be finished completely, so the system can be
> stalled without giving GC a chance to release some memory.
>
> It's the expiry check causing this. The persistent stores themselves
> seem to be managed as expected (no issues, no inconsistency, no loss);
> our situation is independent of the storage (reproducable for leveldb
> and kahadb). For KahaDB we use 16mb for journal files since years
> (helps to save a huge amount of space required for pending messages
> not consumed for some days due to offline situations on client side).
> Anyway, here is our current configuration you requested:
>
> <persistenceAdapter>
>   <kahaDB directory="${activemq.base}/data/kahadb"
> enableIndexWriteAsync="true" journalMaxFileLength="16mb"
> indexWriteBatchSize="10000" indexCacheSize="10000" />
> <!--
>   <levelDB directory="${activemq.base}/data/leveldb" logSize="33554432" />
> -->
> </persistenceAdapter>
>
>
> b.) Some proposal concerning AMQ-6115:
>
> In my point of view, it's worth to discuss the one and only
> memoryLimit parameter used for both the regular browse/consume threads
> and the checkpoint/cleanup threads.
> There should always be enough space to browse/consume any queue at
> least with prefetch 1 resp. one of the next pending messages.
> Maybe - in this case - 2 well-balanced memoryLimit parameters with
> priority on consumption instead of checkpoint/cleanup are helpful for
> a a better regulation. Or something near it.
>
>
> c.) Our results and an acceptable solution so far:
>
> After a thorough investigation (w/o changing ActiveMQ source code) the
> result is for now that we need to accept the limitations defined by
> the single memoryLimit parameter used both for the #checkpoint/cleanup
> process and browsing/consuming queues.
>
> **1.) Memory**
>
> There is not a problem, if we use a much higher memoryLimit (together
> with a higher max-heap) to support both the message caching per
> destination during the #checkpoint/cleanup workflow and our
> requirements to browse/consume messages.
>
> But more memory is not an option in our scenario, we need to deal with
> 1024m max-heap and 500m memoryLimit.
>
> Besides this, constantly setting higher memoryLimits just because of
> more persistent queues containing hundreds/thousands of pending
> messages together with certain offline/inactive consumer scenarios
> should be discussed in detail (IMHO).
>
>
> **2.) Persistent Adapters**
>
> We ruled out persistent adapters as the cause of the problem, because
> the behaviour doesn't change, if we switch different types of
> persistent stores (KahaDB, LevelDB, JDBC-PostgreSQL).
>
> During the debugging sessions with KahaDB we also see regular
> checkpoint handling, the storage is managed as expected.
>
>
> **3.) Destination Policy / Expiration Check**
>
> Our problem completely disappears, if we disable caching and the
> expiration check, which is the actual cause of the problem.
>
> The corresponding properties are documented and there is a nice blog
> article about Message Priorities with a description quite suitable for
> our scenario:
>
> - http://activemq.apache.org/how-can-i-support-priority-queues.html
> -
>
> http://blog.christianposta.com/activemq/activemq-message-priorities-how-it-works/
>
> We simply added useCache="false" and expireMessagesPeriod="0" to the
> policyEntry:
>
> <destinationPolicy>
>   <policyMap>
>     <policyEntries>
>       <policyEntry queue=">" producerFlowControl="false"
> optimizedDispatch="true" memoryLimit="128mb"
> timeBeforeDispatchStarts="1000"
>                              useCache="false" expireMessagesPeriod="0">
>         <dispatchPolicy>
>           <strictOrderDispatchPolicy />
>         </dispatchPolicy>
>         <pendingQueuePolicy>
>           <storeCursor />
>         </pendingQueuePolicy>
>       </policyEntry>
>     </policyEntries>
>   </policyMap>
> </destinationPolicy>
>
>
> The consequences are clear, if we don't use in-mem caching anymore and
> never check for message expiration.
>
> For we neither use message expiration nor message priorities and the
> current message dispatching is fast enough for us, this trade-off is
> acceptable regarding given system limitations.
>
> One should also think about well-defined prefetch limits for memory
> consumption during specific workflows. Message sizes in our scenario
> can be 2 Bytes up to approx. 100 KB, so more individual policyEntries
> and client consumer configurations could be helpful to optimize system
> behaviour concerning performance and memory usage (see
> http://activemq.apache.org/per-destination-policies.html).
>
>
> Cheers
> Klaus
>
>
> Am 11.01.16 um 15:35 schrieb Tim Bain:
> > I believe you are correct: browsing a persistent queue uses bytes
> > from the memory store, because those bytes must be read from the
> > persistence store into the memory store before they can be handed
> > off to browsers or consumers.  If all available bytes in the memory
> > store are already in use, the messages can't be paged into the
> > memory store, and so the operation that required them to be paged
> > in will hang/fail.
> >
> > You can work around the problem by increasing your memory store
> > size via trial-and-error until the problem goes away.  Note that
> > the broker itself needs some amount of memory, so you can't give
> > the whole heap over to the memory store or you'll risk getting
> > OOMs, which means you may need to increase the heap size as well.
> > You can estimate how much memory the broker needs aside from the
> > memory store by subtracting the bytes used for the memory store
> > (539 MB) from the total heap bytes used as measured via JConsole or
> > similar tools.  I'd double (or more) that number to be safe, if it
> > was me; the last thing I want to deal with in a production
> > application (ActiveMQ or anything else) is running out of memory
> > because I tried to cut the memory limits too close just to save a
> > little RAM.
> >
> > All of that is how to work around the fact that before you try to
> > browse your queue, something else has already consumed all
> > available bytes in the memory store.  If you want to dig into why
> > that's happening, we'd need to try to figure out what those bytes
> > are being used for and whether it's possible to change
> > configuration values to reduce the usage so it fits into your
> > current limit.  There will definitely be more effort required than
> > simply increasing the memory limit (and max heap size), but we can
> > try if you're not able to increase the limits enough to fix the
> > problem.
> >
> > If you want to go down that path, one thread to pull on is your
> > observation that you "can browse/consume some Queues  _until_ the
> > #checkpoint call after 30 seconds."  I assume from your reference
> > to checkpointing that you're using KahaDB as your persistence
> > store.  Can you post the KahaDB portion of your config?
> >
> > Your statements here and in your StackOverflow post (
> >
> http://stackoverflow.com/questions/34679854/how-to-avoid-blocking-of-queue-browsing-after-activemq-checkpoint-call
> )
> >
> >
> indicate that you think that the problem is that memory isn't getting
> > garbage collected after the operation that needed it (i.e. the
> > checkpoint) completes, but it's also possible that the checkpoint
> > operation isn't completing because it can't get enough messages
> > read into the memory store.  Have you confirmed via the thread dump
> > that there is not a checkpoint operation still in progress?  Also,
> > how large are your journal files that are getting checkpointed?  If
> > they're large enough that all messages for one file won't fit into
> > the memory store, you might be able to prevent the problem by using
> > smaller files.
> >
> > Tim On Jan 8, 2016 9:32 AM, "Klaus Pittig"
> > <kl...@futura4retail.com> wrote:
> >
> >> If I increase the JVM max heap size (4GB), the behavior does not
> >> change. In my point of view, the configured memoryLimit (500 MB)
> >> works as expected (heapdump shows same max. size for the
> >> TextMessage content, i.e. 55002 byte[] instances containing 539
> >> MB total).
> >>
> >> However, trying to browse a queue shows no content, even if there
> >> is enough heap memory available.
> >>
> >> As far as i understand the sourcecode, this also due to the
> >> configured memoryLimit, because - i hope this is the answer you
> >> expect - the calculation for available causes hasSpace = false.
> >>
> >> I found this here:
> >>
> >> AbstractPendingMessageCursor { public boolean hasSpace() { return
> >> systemUsage != null ?
> >> (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark))
> >> : true; } public boolean isFull() { return systemUsage != null ?
> >> systemUsage.getMemoryUsage().isFull() : false; } }
> >>
> >>
> >> #hasSpace is in this case called during a click on a queue in
> >> the Webconsole; see the 2 stacks during this workflow:
> >>
> >> Daemon Thread [Queue:aaa114] (Suspended (breakpoint at line 107
> >> in QueueStorePrefetch)) owns: QueueStorePrefetch (id=6036) owns:
> >> StoreQueueCursor (id=6037) owns: Object (id=6038)
> >> QueueStorePrefetch.doFillBatch() line: 107
> >> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381
> >> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142
> >> StoreQueueCursor.reset() line: 159
> >> Queue.doPageInForDispatch(boolean, boolean) line: 1897
> >> Queue.pageInMessages(boolean) line: 2119 Queue.iterate() line:
> >> 1596 DedicatedTaskRunner.runTask() line: 112
> >> DedicatedTaskRunner$1.run() line: 42
> >>
> >> Daemon Thread [ActiveMQ VMTransport: vm://localhost#1]
> >> (Suspended (breakpoint at line 107 in QueueStorePrefetch)) owns:
> >> QueueStorePrefetch (id=5974) owns: StoreQueueCursor (id=5975)
> >> owns: Object (id=5976) owns: Object (id=5977)
> >> QueueStorePrefetch.doFillBatch() line: 107
> >> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381
> >> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142
> >> StoreQueueCursor.reset() line: 159
> >> Queue.doPageInForDispatch(boolean, boolean) line: 1897
> >> Queue.pageInMessages(boolean) line: 2119 Queue.iterate() line:
> >> 1596 Queue.wakeup() line: 1822
> >> Queue.addSubscription(ConnectionContext, Subscription) line: 491
> >> ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext,
> >>
> >>
> ConsumerInfo) line: 399
> >> ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext,
> >> ConsumerInfo) line: 427
> >> ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo)
> >> line: 244
> >> AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext,
> >> ConsumerInfo) line: 102
> >> AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line:
> >> 104
> >> CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext,
> >>
> >>
> ConsumerInfo)
> >> line: 102
> >> TransactionBroker(BrokerFilter).addConsumer(ConnectionContext,
> >> ConsumerInfo) line: 102
> >> StatisticsBroker(BrokerFilter).addConsumer(ConnectionContext,
> >> ConsumerInfo) line: 102
> >> BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext,
> >>
> >>
> ConsumerInfo) line: 107
> >> TransportConnection.processAddConsumer(ConsumerInfo) line: 663
> >> ConsumerInfo.visit(CommandVisitor) line: 348
> >> TransportConnection.service(Command) line: 334
> >> TransportConnection$1.onCommand(Object) line: 188
> >> ResponseCorrelator.onCommand(Object) line: 116
> >> MutexTransport.onCommand(Object) line: 50 VMTransport.iterate()
> >> line: 248 DedicatedTaskRunner.runTask() line: 112
> >> DedicatedTaskRunner$1.run() line: 42
> >>
> >>
> >>
> >> Setting queueBrowsePrefetch="1" and queuePrefetch="1" in the
> >> PolicyEntry for queue=">" also has no effect.
> >>
> >>
> >> Am 08.01.16 um 16:32 schrieb Tim Bain:
> >>> If you increase your JVM size (4GB, 8GB, etc., the biggest your
> >>> OS and hardware will support), does the behavior change?  Does
> >>> it truly take all available memory, or just all the memory that
> >>> you've made available to it (which isn't tiny but really isn't
> >>> all that big)?
> >>>
> >>> Also, how do you know that the MessageCursor seems to decide
> >>> that there is not enough memory and stops delivery of queue
> >>> content to browsers/consumers?  What symptom tells you that? On
> >>> Jan 8, 2016 8:25 AM, "Klaus Pittig"
> >>> <kl...@futura4retail.com> wrote:
> >>>
> >>>> (related issue:
> >>>> https://issues.apache.org/jira/browse/AMQ-6115)
> >>>>
> >>>> There's a problem when Using ActiveMQ with a large number of
> >>>> Persistence Queues (250) á 1000 persistent TextMessages á 10
> >>>> KB.
> >>>>
> >>>> Our scenario requires these messages to remain in the storage
> >>>> over a long time (days), until they are consumed (large
> >>>> amounts of data are staged for distribution for many
> >>>> consumer, that could be offline for some days).
> >>>>
> >>>>
> >>>> After the Persistence Store is filled with these Messages and
> >>>> after a broker restart we can browse/consume some Queues
> >>>> _until_ the #checkpoint call after 30 seconds.
> >>>>
> >>>> This call causes the broker to use all available memory and
> >>>> never releases it for other tasks such as Queue
> >>>> browse/consume. Internally the MessageCursor seems to decide,
> >>>> that there is not enough memory and stops delivery of queue
> >>>> content to browsers/consumers.
> >>>>
> >>>>
> >>>> => Is there a way to avoid this behaviour by configuration or
> >>>> is this a bug?
> >>>>
> >>>> The expectation is, that we can consume/browse any queue
> >>>> under all circumstances.
> >>>>
> >>>> Settings below are in production for some time now and
> >>>> several recommendations are applied found in the ActiveMQ
> >>>> documentation (destination policies, systemUsage, persistence
> >>>> store options etc.)
> >>>>
> >>>> - Behaviour is tested with ActiveMQ: 5.11.2, 5.13.0 and
> >>>> 5.5.1. - Memory Settings: Xmx=1024m - Java: 1.8 or 1.7 - OS:
> >>>> Windows, MacOS, Linux - PersistenceAdapter: KahaDB or
> >>>> LevelDB - Disc: enough free space (200 GB) and physical
> >>>> memory (16 GB max).
> >>>>
> >>>> Besides the above mentioned settings we use the following
> >>>> settings for the broker (btw: changing the memoryLimit to a
> >>>> lower value like 1mb does not change the situation):
> >>>>
> >>>> <destinationPolicy> <policyMap> <policyEntries> <policyEntry
> >>>> queue=">" producerFlowControl="false"
> >>>> optimizedDispatch="true" memoryLimit="128mb"
> >>>> timeBeforeDispatchStarts="1000"> <dispatchPolicy>
> >>>> <strictOrderDispatchPolicy /> </dispatchPolicy>
> >>>> <pendingQueuePolicy> <storeCursor /> </pendingQueuePolicy>
> >>>> </policyEntry> </policyEntries> </policyMap>
> >>>> </destinationPolicy> <systemUsage> <systemUsage
> >>>> sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage
> >>>> limit="50 mb" /> </memoryUsage> <storeUsage> <storeUsage
> >>>> limit="80000 mb" /> </storeUsage> <tempUsage> <tempUsage
> >>>> limit="1000 mb" /> </tempUsage> </systemUsage>
> >>>> </systemUsage>
> >>>>
> >>>> If we set the **cursorMemoryHighWaterMark** in the
> >>>> destinationPolicy to a higher value like **150** or **600**
> >>>> depending on the difference between memoryUsage and the
> >>>> available heap space relieves the situation a bit for a
> >>>> workaround, but this is not really an option for production
> >>>> systems in my point of view.
> >>>>
> >>>> Screenie with information from Oracle Mission Control showing
> >>>> those ActiveMQTextMessage instances that are never released
> >>>> from memory:
> >>>>
> >>>> http://goo.gl/EjEixV
> >>>>
> >>>>
> >>>> Cheers Klaus
> >>>>
> >>>
> >>
> >
>

Re: How to avoid blocking of queue browsing after ActiveMQ checkpoint call

Posted by Klaus Pittig <kl...@futura4retail.com>.
a.) Regarding your last answer (thanks for your effort by the way):

I'm aware of the relation between the heap and the systemUsage
memoryLimit and we make sure that there are no illogical settings.
The primary requirement is to have a stable system running 'forever'
w/o any memory issues at any time independent from the load/throughput.
No one really wants to deal with memory settings on the edge of limits.

You're right: the memory is completely consumed. And I can't guarantee
the checkpoint/cleanup to be finished completely, so the system can be
stalled without giving GC a chance to release some memory.

It's the expiry check causing this. The persistent stores themselves
seem to be managed as expected (no issues, no inconsistency, no loss);
our situation is independent of the storage (reproducable for leveldb
and kahadb). For KahaDB we use 16mb for journal files since years
(helps to save a huge amount of space required for pending messages
not consumed for some days due to offline situations on client side).
Anyway, here is our current configuration you requested:

<persistenceAdapter>
  <kahaDB directory="${activemq.base}/data/kahadb"
enableIndexWriteAsync="true" journalMaxFileLength="16mb"
indexWriteBatchSize="10000" indexCacheSize="10000" />
<!--
  <levelDB directory="${activemq.base}/data/leveldb" logSize="33554432" />
-->
</persistenceAdapter>


b.) Some proposal concerning AMQ-6115:

In my point of view, it's worth to discuss the one and only
memoryLimit parameter used for both the regular browse/consume threads
and the checkpoint/cleanup threads.
There should always be enough space to browse/consume any queue at
least with prefetch 1 resp. one of the next pending messages.
Maybe - in this case - 2 well-balanced memoryLimit parameters with
priority on consumption instead of checkpoint/cleanup are helpful for
a a better regulation. Or something near it.


c.) Our results and an acceptable solution so far:

After a thorough investigation (w/o changing ActiveMQ source code) the
result is for now that we need to accept the limitations defined by
the single memoryLimit parameter used both for the #checkpoint/cleanup
process and browsing/consuming queues.

**1.) Memory**

There is not a problem, if we use a much higher memoryLimit (together
with a higher max-heap) to support both the message caching per
destination during the #checkpoint/cleanup workflow and our
requirements to browse/consume messages.

But more memory is not an option in our scenario, we need to deal with
1024m max-heap and 500m memoryLimit.

Besides this, constantly setting higher memoryLimits just because of
more persistent queues containing hundreds/thousands of pending
messages together with certain offline/inactive consumer scenarios
should be discussed in detail (IMHO).


**2.) Persistent Adapters**

We ruled out persistent adapters as the cause of the problem, because
the behaviour doesn't change, if we switch different types of
persistent stores (KahaDB, LevelDB, JDBC-PostgreSQL).

During the debugging sessions with KahaDB we also see regular
checkpoint handling, the storage is managed as expected.


**3.) Destination Policy / Expiration Check**

Our problem completely disappears, if we disable caching and the
expiration check, which is the actual cause of the problem.

The corresponding properties are documented and there is a nice blog
article about Message Priorities with a description quite suitable for
our scenario:

- http://activemq.apache.org/how-can-i-support-priority-queues.html
-
http://blog.christianposta.com/activemq/activemq-message-priorities-how-it-works/

We simply added useCache="false" and expireMessagesPeriod="0" to the
policyEntry:

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" producerFlowControl="false"
optimizedDispatch="true" memoryLimit="128mb"
timeBeforeDispatchStarts="1000"
                             useCache="false" expireMessagesPeriod="0">
        <dispatchPolicy>
          <strictOrderDispatchPolicy />
        </dispatchPolicy>
        <pendingQueuePolicy>
          <storeCursor />
        </pendingQueuePolicy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>


The consequences are clear, if we don't use in-mem caching anymore and
never check for message expiration.

For we neither use message expiration nor message priorities and the
current message dispatching is fast enough for us, this trade-off is
acceptable regarding given system limitations.

One should also think about well-defined prefetch limits for memory
consumption during specific workflows. Message sizes in our scenario
can be 2 Bytes up to approx. 100 KB, so more individual policyEntries
and client consumer configurations could be helpful to optimize system
behaviour concerning performance and memory usage (see
http://activemq.apache.org/per-destination-policies.html).


Cheers
Klaus


Am 11.01.16 um 15:35 schrieb Tim Bain:
> I believe you are correct: browsing a persistent queue uses bytes
> from the memory store, because those bytes must be read from the
> persistence store into the memory store before they can be handed
> off to browsers or consumers.  If all available bytes in the memory
> store are already in use, the messages can't be paged into the
> memory store, and so the operation that required them to be paged
> in will hang/fail.
> 
> You can work around the problem by increasing your memory store
> size via trial-and-error until the problem goes away.  Note that
> the broker itself needs some amount of memory, so you can't give
> the whole heap over to the memory store or you'll risk getting
> OOMs, which means you may need to increase the heap size as well.
> You can estimate how much memory the broker needs aside from the
> memory store by subtracting the bytes used for the memory store
> (539 MB) from the total heap bytes used as measured via JConsole or
> similar tools.  I'd double (or more) that number to be safe, if it
> was me; the last thing I want to deal with in a production
> application (ActiveMQ or anything else) is running out of memory
> because I tried to cut the memory limits too close just to save a
> little RAM.
> 
> All of that is how to work around the fact that before you try to
> browse your queue, something else has already consumed all
> available bytes in the memory store.  If you want to dig into why
> that's happening, we'd need to try to figure out what those bytes
> are being used for and whether it's possible to change
> configuration values to reduce the usage so it fits into your
> current limit.  There will definitely be more effort required than 
> simply increasing the memory limit (and max heap size), but we can
> try if you're not able to increase the limits enough to fix the
> problem.
> 
> If you want to go down that path, one thread to pull on is your
> observation that you "can browse/consume some Queues  _until_ the
> #checkpoint call after 30 seconds."  I assume from your reference
> to checkpointing that you're using KahaDB as your persistence
> store.  Can you post the KahaDB portion of your config?
> 
> Your statements here and in your StackOverflow post ( 
> http://stackoverflow.com/questions/34679854/how-to-avoid-blocking-of-queue-browsing-after-activemq-checkpoint-call)
>
> 
indicate that you think that the problem is that memory isn't getting
> garbage collected after the operation that needed it (i.e. the
> checkpoint) completes, but it's also possible that the checkpoint
> operation isn't completing because it can't get enough messages
> read into the memory store.  Have you confirmed via the thread dump
> that there is not a checkpoint operation still in progress?  Also,
> how large are your journal files that are getting checkpointed?  If
> they're large enough that all messages for one file won't fit into
> the memory store, you might be able to prevent the problem by using
> smaller files.
> 
> Tim On Jan 8, 2016 9:32 AM, "Klaus Pittig"
> <kl...@futura4retail.com> wrote:
> 
>> If I increase the JVM max heap size (4GB), the behavior does not
>> change. In my point of view, the configured memoryLimit (500 MB)
>> works as expected (heapdump shows same max. size for the
>> TextMessage content, i.e. 55002 byte[] instances containing 539
>> MB total).
>> 
>> However, trying to browse a queue shows no content, even if there
>> is enough heap memory available.
>> 
>> As far as i understand the sourcecode, this also due to the
>> configured memoryLimit, because - i hope this is the answer you
>> expect - the calculation for available causes hasSpace = false.
>> 
>> I found this here:
>> 
>> AbstractPendingMessageCursor { public boolean hasSpace() { return
>> systemUsage != null ? 
>> (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark))
>> : true; } public boolean isFull() { return systemUsage != null ?
>> systemUsage.getMemoryUsage().isFull() : false; } }
>> 
>> 
>> #hasSpace is in this case called during a click on a queue in
>> the Webconsole; see the 2 stacks during this workflow:
>> 
>> Daemon Thread [Queue:aaa114] (Suspended (breakpoint at line 107
>> in QueueStorePrefetch)) owns: QueueStorePrefetch (id=6036) owns:
>> StoreQueueCursor (id=6037) owns: Object (id=6038) 
>> QueueStorePrefetch.doFillBatch() line: 107 
>> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381 
>> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142 
>> StoreQueueCursor.reset() line: 159 
>> Queue.doPageInForDispatch(boolean, boolean) line: 1897 
>> Queue.pageInMessages(boolean) line: 2119 Queue.iterate() line:
>> 1596 DedicatedTaskRunner.runTask() line: 112 
>> DedicatedTaskRunner$1.run() line: 42
>> 
>> Daemon Thread [ActiveMQ VMTransport: vm://localhost#1]
>> (Suspended (breakpoint at line 107 in QueueStorePrefetch)) owns:
>> QueueStorePrefetch (id=5974) owns: StoreQueueCursor (id=5975) 
>> owns: Object (id=5976) owns: Object (id=5977) 
>> QueueStorePrefetch.doFillBatch() line: 107 
>> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381 
>> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142 
>> StoreQueueCursor.reset() line: 159 
>> Queue.doPageInForDispatch(boolean, boolean) line: 1897 
>> Queue.pageInMessages(boolean) line: 2119 Queue.iterate() line:
>> 1596 Queue.wakeup() line: 1822 
>> Queue.addSubscription(ConnectionContext, Subscription) line: 491 
>> ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext,
>>
>> 
ConsumerInfo) line: 399
>> ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, 
>> ConsumerInfo) line: 427 
>> ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo)
>> line: 244 
>> AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, 
>> ConsumerInfo) line: 102 
>> AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line:
>> 104 
>> CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext,
>>
>> 
ConsumerInfo)
>> line: 102 
>> TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, 
>> ConsumerInfo) line: 102 
>> StatisticsBroker(BrokerFilter).addConsumer(ConnectionContext, 
>> ConsumerInfo) line: 102 
>> BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext,
>>
>> 
ConsumerInfo) line: 107
>> TransportConnection.processAddConsumer(ConsumerInfo) line: 663 
>> ConsumerInfo.visit(CommandVisitor) line: 348 
>> TransportConnection.service(Command) line: 334 
>> TransportConnection$1.onCommand(Object) line: 188 
>> ResponseCorrelator.onCommand(Object) line: 116 
>> MutexTransport.onCommand(Object) line: 50 VMTransport.iterate()
>> line: 248 DedicatedTaskRunner.runTask() line: 112 
>> DedicatedTaskRunner$1.run() line: 42
>> 
>> 
>> 
>> Setting queueBrowsePrefetch="1" and queuePrefetch="1" in the 
>> PolicyEntry for queue=">" also has no effect.
>> 
>> 
>> Am 08.01.16 um 16:32 schrieb Tim Bain:
>>> If you increase your JVM size (4GB, 8GB, etc., the biggest your
>>> OS and hardware will support), does the behavior change?  Does
>>> it truly take all available memory, or just all the memory that
>>> you've made available to it (which isn't tiny but really isn't
>>> all that big)?
>>> 
>>> Also, how do you know that the MessageCursor seems to decide
>>> that there is not enough memory and stops delivery of queue
>>> content to browsers/consumers?  What symptom tells you that? On
>>> Jan 8, 2016 8:25 AM, "Klaus Pittig"
>>> <kl...@futura4retail.com> wrote:
>>> 
>>>> (related issue:
>>>> https://issues.apache.org/jira/browse/AMQ-6115)
>>>> 
>>>> There's a problem when Using ActiveMQ with a large number of
>>>> Persistence Queues (250) á 1000 persistent TextMessages á 10
>>>> KB.
>>>> 
>>>> Our scenario requires these messages to remain in the storage
>>>> over a long time (days), until they are consumed (large
>>>> amounts of data are staged for distribution for many
>>>> consumer, that could be offline for some days).
>>>> 
>>>> 
>>>> After the Persistence Store is filled with these Messages and
>>>> after a broker restart we can browse/consume some Queues
>>>> _until_ the #checkpoint call after 30 seconds.
>>>> 
>>>> This call causes the broker to use all available memory and
>>>> never releases it for other tasks such as Queue
>>>> browse/consume. Internally the MessageCursor seems to decide,
>>>> that there is not enough memory and stops delivery of queue
>>>> content to browsers/consumers.
>>>> 
>>>> 
>>>> => Is there a way to avoid this behaviour by configuration or
>>>> is this a bug?
>>>> 
>>>> The expectation is, that we can consume/browse any queue
>>>> under all circumstances.
>>>> 
>>>> Settings below are in production for some time now and
>>>> several recommendations are applied found in the ActiveMQ
>>>> documentation (destination policies, systemUsage, persistence
>>>> store options etc.)
>>>> 
>>>> - Behaviour is tested with ActiveMQ: 5.11.2, 5.13.0 and
>>>> 5.5.1. - Memory Settings: Xmx=1024m - Java: 1.8 or 1.7 - OS:
>>>> Windows, MacOS, Linux - PersistenceAdapter: KahaDB or
>>>> LevelDB - Disc: enough free space (200 GB) and physical
>>>> memory (16 GB max).
>>>> 
>>>> Besides the above mentioned settings we use the following
>>>> settings for the broker (btw: changing the memoryLimit to a
>>>> lower value like 1mb does not change the situation):
>>>> 
>>>> <destinationPolicy> <policyMap> <policyEntries> <policyEntry
>>>> queue=">" producerFlowControl="false" 
>>>> optimizedDispatch="true" memoryLimit="128mb" 
>>>> timeBeforeDispatchStarts="1000"> <dispatchPolicy> 
>>>> <strictOrderDispatchPolicy /> </dispatchPolicy> 
>>>> <pendingQueuePolicy> <storeCursor /> </pendingQueuePolicy> 
>>>> </policyEntry> </policyEntries> </policyMap> 
>>>> </destinationPolicy> <systemUsage> <systemUsage
>>>> sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage
>>>> limit="50 mb" /> </memoryUsage> <storeUsage> <storeUsage
>>>> limit="80000 mb" /> </storeUsage> <tempUsage> <tempUsage
>>>> limit="1000 mb" /> </tempUsage> </systemUsage> 
>>>> </systemUsage>
>>>> 
>>>> If we set the **cursorMemoryHighWaterMark** in the
>>>> destinationPolicy to a higher value like **150** or **600**
>>>> depending on the difference between memoryUsage and the
>>>> available heap space relieves the situation a bit for a
>>>> workaround, but this is not really an option for production 
>>>> systems in my point of view.
>>>> 
>>>> Screenie with information from Oracle Mission Control showing
>>>> those ActiveMQTextMessage instances that are never released
>>>> from memory:
>>>> 
>>>> http://goo.gl/EjEixV
>>>> 
>>>> 
>>>> Cheers Klaus
>>>> 
>>> 
>> 
> 

Re: How to avoid blocking of queue browsing after ActiveMQ checkpoint call

Posted by Tim Bain <tb...@alumni.duke.edu>.
I believe you are correct: browsing a persistent queue uses bytes from the
memory store, because those bytes must be read from the persistence store
into the memory store before they can be handed off to browsers or
consumers.  If all available bytes in the memory store are already in use,
the messages can't be paged into the memory store, and so the operation
that required them to be paged in will hang/fail.

You can work around the problem by increasing your memory store size via
trial-and-error until the problem goes away.  Note that the broker itself
needs some amount of memory, so you can't give the whole heap over to the
memory store or you'll risk getting OOMs, which means you may need to
increase the heap size as well.  You can estimate how much memory the
broker needs aside from the memory store by subtracting the bytes used for
the memory store (539 MB) from the total heap bytes used as measured via
JConsole or similar tools.  I'd double (or more) that number to be safe, if
it was me; the last thing I want to deal with in a production application
(ActiveMQ or anything else) is running out of memory because I tried to cut
the memory limits too close just to save a little RAM.

All of that is how to work around the fact that before you try to browse
your queue, something else has already consumed all available bytes in the
memory store.  If you want to dig into why that's happening, we'd need to
try to figure out what those bytes are being used for and whether it's
possible to change configuration values to reduce the usage so it fits into
your current limit.  There will definitely be more effort required than
simply increasing the memory limit (and max heap size), but we can try if
you're not able to increase the limits enough to fix the problem.

If you want to go down that path, one thread to pull on is your observation
that you "can browse/consume some Queues  _until_ the #checkpoint call
after 30 seconds."  I assume from your reference to checkpointing that
you're using KahaDB as your persistence store.  Can you post the KahaDB
portion of your config?

Your statements here and in your StackOverflow post (
http://stackoverflow.com/questions/34679854/how-to-avoid-blocking-of-queue-browsing-after-activemq-checkpoint-call)
indicate that you think that the problem is that memory isn't getting
garbage collected after the operation that needed it (i.e. the checkpoint)
completes, but it's also possible that the checkpoint operation isn't
completing because it can't get enough messages read into the memory
store.  Have you confirmed via the thread dump that there is not a
checkpoint operation still in progress?  Also, how large are your journal
files that are getting checkpointed?  If they're large enough that all
messages for one file won't fit into the memory store, you might be able to
prevent the problem by using smaller files.

Tim
On Jan 8, 2016 9:32 AM, "Klaus Pittig" <kl...@futura4retail.com>
wrote:

> If I increase the JVM max heap size (4GB), the behavior does not change.
> In my point of view, the configured memoryLimit (500 MB) works as
> expected (heapdump shows same max. size for the TextMessage content,
> i.e. 55002 byte[] instances containing 539 MB total).
>
> However, trying to browse a queue shows no content, even if there is
> enough heap memory available.
>
> As far as i understand the sourcecode, this also due to the configured
> memoryLimit, because - i hope this is the answer you expect - the
> calculation for available causes hasSpace = false.
>
> I found this here:
>
> AbstractPendingMessageCursor {
> public boolean hasSpace() {
> return systemUsage != null ?
> (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) : true;
> }
> public boolean isFull() {
> return systemUsage != null ? systemUsage.getMemoryUsage().isFull() :
> false;
> }
> }
>
>
> #hasSpace is in this case called during a click on a queue in the
> Webconsole; see the 2 stacks during this workflow:
>
> Daemon Thread [Queue:aaa114] (Suspended (breakpoint at line 107 in
> QueueStorePrefetch))
> owns: QueueStorePrefetch (id=6036)
> owns: StoreQueueCursor (id=6037)
> owns: Object (id=6038)
> QueueStorePrefetch.doFillBatch() line: 107
> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381
> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142
> StoreQueueCursor.reset() line: 159
> Queue.doPageInForDispatch(boolean, boolean) line: 1897
> Queue.pageInMessages(boolean) line: 2119
> Queue.iterate() line: 1596
> DedicatedTaskRunner.runTask() line: 112
> DedicatedTaskRunner$1.run() line: 42
>
> Daemon Thread [ActiveMQ VMTransport: vm://localhost#1] (Suspended
> (breakpoint at line 107 in QueueStorePrefetch))
> owns: QueueStorePrefetch (id=5974)
> owns: StoreQueueCursor (id=5975)
> owns: Object (id=5976)
> owns: Object (id=5977)
> QueueStorePrefetch.doFillBatch() line: 107
> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381
> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142
> StoreQueueCursor.reset() line: 159
> Queue.doPageInForDispatch(boolean, boolean) line: 1897
> Queue.pageInMessages(boolean) line: 2119
> Queue.iterate() line: 1596
> Queue.wakeup() line: 1822
> Queue.addSubscription(ConnectionContext, Subscription) line: 491
> ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext,
> ConsumerInfo) line: 399
> ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext,
> ConsumerInfo) line: 427
> ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line:
> 244
> AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 102
> AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 104
> CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo)
> line: 102
> TransactionBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 102
> StatisticsBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 102
> BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 107
> TransportConnection.processAddConsumer(ConsumerInfo) line: 663
> ConsumerInfo.visit(CommandVisitor) line: 348
> TransportConnection.service(Command) line: 334
> TransportConnection$1.onCommand(Object) line: 188
> ResponseCorrelator.onCommand(Object) line: 116
> MutexTransport.onCommand(Object) line: 50
> VMTransport.iterate() line: 248
> DedicatedTaskRunner.runTask() line: 112
> DedicatedTaskRunner$1.run() line: 42
>
>
>
> Setting queueBrowsePrefetch="1" and queuePrefetch="1" in the
> PolicyEntry for queue=">" also has no effect.
>
>
> Am 08.01.16 um 16:32 schrieb Tim Bain:
> > If you increase your JVM size (4GB, 8GB, etc., the biggest your OS and
> > hardware will support), does the behavior change?  Does it truly take all
> > available memory, or just all the memory that you've made available to it
> > (which isn't tiny but really isn't all that big)?
> >
> > Also, how do you know that the
> > MessageCursor seems to decide that there is not enough memory and stops
> > delivery of queue content to browsers/consumers?  What symptom tells you
> > that?
> > On Jan 8, 2016 8:25 AM, "Klaus Pittig" <kl...@futura4retail.com>
> > wrote:
> >
> >> (related issue: https://issues.apache.org/jira/browse/AMQ-6115)
> >>
> >> There's a problem when Using ActiveMQ with a large number of Persistence
> >> Queues (250) á 1000 persistent TextMessages á 10 KB.
> >>
> >> Our scenario requires these messages to remain in the storage over a
> >> long time (days), until they are consumed (large amounts of data are
> >> staged for distribution for many consumer, that could be offline for
> >> some days).
> >>
> >>
> >> After the Persistence Store is filled with these Messages and after a
> >> broker restart we can browse/consume some Queues  _until_ the
> >> #checkpoint call after 30 seconds.
> >>
> >> This call causes the broker to use all available memory and never
> >> releases it for other tasks such as Queue browse/consume. Internally the
> >> MessageCursor seems to decide, that there is not enough memory and stops
> >> delivery of queue content to browsers/consumers.
> >>
> >>
> >> => Is there a way to avoid this behaviour by configuration or is this a
> >> bug?
> >>
> >> The expectation is, that we can consume/browse any queue under all
> >> circumstances.
> >>
> >> Settings below are in production for some time now and several
> >> recommendations are applied found in the ActiveMQ documentation
> >> (destination policies, systemUsage, persistence store options etc.)
> >>
> >>  - Behaviour is tested with ActiveMQ: 5.11.2, 5.13.0 and 5.5.1.
> >>  - Memory Settings: Xmx=1024m
> >>  - Java: 1.8 or 1.7
> >>  - OS: Windows, MacOS, Linux
> >>  - PersistenceAdapter: KahaDB or LevelDB
> >>  - Disc: enough free space (200 GB) and physical memory (16 GB max).
> >>
> >> Besides the above mentioned settings we use the following settings for
> >> the broker (btw: changing the memoryLimit to a lower value like 1mb does
> >> not change the situation):
> >>
> >>     <destinationPolicy>
> >>         <policyMap>
> >>             <policyEntries>
> >>                 <policyEntry queue=">" producerFlowControl="false"
> >> optimizedDispatch="true" memoryLimit="128mb"
> >> timeBeforeDispatchStarts="1000">
> >>                     <dispatchPolicy>
> >>                         <strictOrderDispatchPolicy />
> >>                     </dispatchPolicy>
> >>                     <pendingQueuePolicy>
> >>                         <storeCursor />
> >>                     </pendingQueuePolicy>
> >>                 </policyEntry>
> >>             </policyEntries>
> >>         </policyMap>
> >>     </destinationPolicy>
> >>     <systemUsage>
> >>         <systemUsage sendFailIfNoSpace="true">
> >>             <memoryUsage>
> >>                 <memoryUsage limit="50 mb" />
> >>             </memoryUsage>
> >>             <storeUsage>
> >>                 <storeUsage limit="80000 mb" />
> >>             </storeUsage>
> >>             <tempUsage>
> >>                 <tempUsage limit="1000 mb" />
> >>             </tempUsage>
> >>         </systemUsage>
> >>     </systemUsage>
> >>
> >> If we set the **cursorMemoryHighWaterMark** in the destinationPolicy to
> >> a higher value like **150** or **600** depending on the difference
> >> between memoryUsage and the available heap space relieves the situation
> >> a bit for a workaround, but this is not really an option for production
> >> systems in my point of view.
> >>
> >> Screenie with information from Oracle Mission Control showing those
> >> ActiveMQTextMessage instances that are never released from memory:
> >>
> >> http://goo.gl/EjEixV
> >>
> >>
> >> Cheers
> >> Klaus
> >>
> >
>

Re: How to avoid blocking of queue browsing after ActiveMQ checkpoint call

Posted by Klaus Pittig <kl...@futura4retail.com>.
If I increase the JVM max heap size (4GB), the behavior does not change.
In my point of view, the configured memoryLimit (500 MB) works as
expected (heapdump shows same max. size for the TextMessage content,
i.e. 55002 byte[] instances containing 539 MB total).

However, trying to browse a queue shows no content, even if there is
enough heap memory available.

As far as i understand the sourcecode, this also due to the configured
memoryLimit, because - i hope this is the answer you expect - the
calculation for available causes hasSpace = false.

I found this here:

AbstractPendingMessageCursor {
public boolean hasSpace() {
return systemUsage != null ?
(!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) : true;
}
public boolean isFull() {
return systemUsage != null ? systemUsage.getMemoryUsage().isFull() :
false;
}
}


#hasSpace is in this case called during a click on a queue in the
Webconsole; see the 2 stacks during this workflow:

Daemon Thread [Queue:aaa114] (Suspended (breakpoint at line 107 in
QueueStorePrefetch))
owns: QueueStorePrefetch (id=6036)
owns: StoreQueueCursor (id=6037)
owns: Object (id=6038)
QueueStorePrefetch.doFillBatch() line: 107
QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381
QueueStorePrefetch(AbstractStoreCursor).reset() line: 142
StoreQueueCursor.reset() line: 159
Queue.doPageInForDispatch(boolean, boolean) line: 1897
Queue.pageInMessages(boolean) line: 2119
Queue.iterate() line: 1596
DedicatedTaskRunner.runTask() line: 112
DedicatedTaskRunner$1.run() line: 42

Daemon Thread [ActiveMQ VMTransport: vm://localhost#1] (Suspended
(breakpoint at line 107 in QueueStorePrefetch))
owns: QueueStorePrefetch (id=5974)
owns: StoreQueueCursor (id=5975)
owns: Object (id=5976)
owns: Object (id=5977)
QueueStorePrefetch.doFillBatch() line: 107
QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381
QueueStorePrefetch(AbstractStoreCursor).reset() line: 142
StoreQueueCursor.reset() line: 159
Queue.doPageInForDispatch(boolean, boolean) line: 1897
Queue.pageInMessages(boolean) line: 2119
Queue.iterate() line: 1596
Queue.wakeup() line: 1822
Queue.addSubscription(ConnectionContext, Subscription) line: 491
ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext,
ConsumerInfo) line: 399
ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext,
ConsumerInfo) line: 427
ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line:
244
AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext,
ConsumerInfo) line: 102
AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 104
CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext,
ConsumerInfo)
line: 102
TransactionBroker(BrokerFilter).addConsumer(ConnectionContext,
ConsumerInfo) line: 102
StatisticsBroker(BrokerFilter).addConsumer(ConnectionContext,
ConsumerInfo) line: 102
BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext,
ConsumerInfo) line: 107
TransportConnection.processAddConsumer(ConsumerInfo) line: 663
ConsumerInfo.visit(CommandVisitor) line: 348
TransportConnection.service(Command) line: 334
TransportConnection$1.onCommand(Object) line: 188
ResponseCorrelator.onCommand(Object) line: 116
MutexTransport.onCommand(Object) line: 50
VMTransport.iterate() line: 248
DedicatedTaskRunner.runTask() line: 112
DedicatedTaskRunner$1.run() line: 42



Setting queueBrowsePrefetch="1" and queuePrefetch="1" in the
PolicyEntry for queue=">" also has no effect.


Am 08.01.16 um 16:32 schrieb Tim Bain:
> If you increase your JVM size (4GB, 8GB, etc., the biggest your OS and
> hardware will support), does the behavior change?  Does it truly take all
> available memory, or just all the memory that you've made available to it
> (which isn't tiny but really isn't all that big)?
> 
> Also, how do you know that the
> MessageCursor seems to decide that there is not enough memory and stops
> delivery of queue content to browsers/consumers?  What symptom tells you
> that?
> On Jan 8, 2016 8:25 AM, "Klaus Pittig" <kl...@futura4retail.com>
> wrote:
> 
>> (related issue: https://issues.apache.org/jira/browse/AMQ-6115)
>>
>> There's a problem when Using ActiveMQ with a large number of Persistence
>> Queues (250) á 1000 persistent TextMessages á 10 KB.
>>
>> Our scenario requires these messages to remain in the storage over a
>> long time (days), until they are consumed (large amounts of data are
>> staged for distribution for many consumer, that could be offline for
>> some days).
>>
>>
>> After the Persistence Store is filled with these Messages and after a
>> broker restart we can browse/consume some Queues  _until_ the
>> #checkpoint call after 30 seconds.
>>
>> This call causes the broker to use all available memory and never
>> releases it for other tasks such as Queue browse/consume. Internally the
>> MessageCursor seems to decide, that there is not enough memory and stops
>> delivery of queue content to browsers/consumers.
>>
>>
>> => Is there a way to avoid this behaviour by configuration or is this a
>> bug?
>>
>> The expectation is, that we can consume/browse any queue under all
>> circumstances.
>>
>> Settings below are in production for some time now and several
>> recommendations are applied found in the ActiveMQ documentation
>> (destination policies, systemUsage, persistence store options etc.)
>>
>>  - Behaviour is tested with ActiveMQ: 5.11.2, 5.13.0 and 5.5.1.
>>  - Memory Settings: Xmx=1024m
>>  - Java: 1.8 or 1.7
>>  - OS: Windows, MacOS, Linux
>>  - PersistenceAdapter: KahaDB or LevelDB
>>  - Disc: enough free space (200 GB) and physical memory (16 GB max).
>>
>> Besides the above mentioned settings we use the following settings for
>> the broker (btw: changing the memoryLimit to a lower value like 1mb does
>> not change the situation):
>>
>>     <destinationPolicy>
>>         <policyMap>
>>             <policyEntries>
>>                 <policyEntry queue=">" producerFlowControl="false"
>> optimizedDispatch="true" memoryLimit="128mb"
>> timeBeforeDispatchStarts="1000">
>>                     <dispatchPolicy>
>>                         <strictOrderDispatchPolicy />
>>                     </dispatchPolicy>
>>                     <pendingQueuePolicy>
>>                         <storeCursor />
>>                     </pendingQueuePolicy>
>>                 </policyEntry>
>>             </policyEntries>
>>         </policyMap>
>>     </destinationPolicy>
>>     <systemUsage>
>>         <systemUsage sendFailIfNoSpace="true">
>>             <memoryUsage>
>>                 <memoryUsage limit="50 mb" />
>>             </memoryUsage>
>>             <storeUsage>
>>                 <storeUsage limit="80000 mb" />
>>             </storeUsage>
>>             <tempUsage>
>>                 <tempUsage limit="1000 mb" />
>>             </tempUsage>
>>         </systemUsage>
>>     </systemUsage>
>>
>> If we set the **cursorMemoryHighWaterMark** in the destinationPolicy to
>> a higher value like **150** or **600** depending on the difference
>> between memoryUsage and the available heap space relieves the situation
>> a bit for a workaround, but this is not really an option for production
>> systems in my point of view.
>>
>> Screenie with information from Oracle Mission Control showing those
>> ActiveMQTextMessage instances that are never released from memory:
>>
>> http://goo.gl/EjEixV
>>
>>
>> Cheers
>> Klaus
>>
> 

Re: How to avoid blocking of queue browsing after ActiveMQ checkpoint call

Posted by Tim Bain <tb...@alumni.duke.edu>.
If you increase your JVM size (4GB, 8GB, etc., the biggest your OS and
hardware will support), does the behavior change?  Does it truly take all
available memory, or just all the memory that you've made available to it
(which isn't tiny but really isn't all that big)?

Also, how do you know that the
MessageCursor seems to decide that there is not enough memory and stops
delivery of queue content to browsers/consumers?  What symptom tells you
that?
On Jan 8, 2016 8:25 AM, "Klaus Pittig" <kl...@futura4retail.com>
wrote:

> (related issue: https://issues.apache.org/jira/browse/AMQ-6115)
>
> There's a problem when Using ActiveMQ with a large number of Persistence
> Queues (250) á 1000 persistent TextMessages á 10 KB.
>
> Our scenario requires these messages to remain in the storage over a
> long time (days), until they are consumed (large amounts of data are
> staged for distribution for many consumer, that could be offline for
> some days).
>
>
> After the Persistence Store is filled with these Messages and after a
> broker restart we can browse/consume some Queues  _until_ the
> #checkpoint call after 30 seconds.
>
> This call causes the broker to use all available memory and never
> releases it for other tasks such as Queue browse/consume. Internally the
> MessageCursor seems to decide, that there is not enough memory and stops
> delivery of queue content to browsers/consumers.
>
>
> => Is there a way to avoid this behaviour by configuration or is this a
> bug?
>
> The expectation is, that we can consume/browse any queue under all
> circumstances.
>
> Settings below are in production for some time now and several
> recommendations are applied found in the ActiveMQ documentation
> (destination policies, systemUsage, persistence store options etc.)
>
>  - Behaviour is tested with ActiveMQ: 5.11.2, 5.13.0 and 5.5.1.
>  - Memory Settings: Xmx=1024m
>  - Java: 1.8 or 1.7
>  - OS: Windows, MacOS, Linux
>  - PersistenceAdapter: KahaDB or LevelDB
>  - Disc: enough free space (200 GB) and physical memory (16 GB max).
>
> Besides the above mentioned settings we use the following settings for
> the broker (btw: changing the memoryLimit to a lower value like 1mb does
> not change the situation):
>
>     <destinationPolicy>
>         <policyMap>
>             <policyEntries>
>                 <policyEntry queue=">" producerFlowControl="false"
> optimizedDispatch="true" memoryLimit="128mb"
> timeBeforeDispatchStarts="1000">
>                     <dispatchPolicy>
>                         <strictOrderDispatchPolicy />
>                     </dispatchPolicy>
>                     <pendingQueuePolicy>
>                         <storeCursor />
>                     </pendingQueuePolicy>
>                 </policyEntry>
>             </policyEntries>
>         </policyMap>
>     </destinationPolicy>
>     <systemUsage>
>         <systemUsage sendFailIfNoSpace="true">
>             <memoryUsage>
>                 <memoryUsage limit="50 mb" />
>             </memoryUsage>
>             <storeUsage>
>                 <storeUsage limit="80000 mb" />
>             </storeUsage>
>             <tempUsage>
>                 <tempUsage limit="1000 mb" />
>             </tempUsage>
>         </systemUsage>
>     </systemUsage>
>
> If we set the **cursorMemoryHighWaterMark** in the destinationPolicy to
> a higher value like **150** or **600** depending on the difference
> between memoryUsage and the available heap space relieves the situation
> a bit for a workaround, but this is not really an option for production
> systems in my point of view.
>
> Screenie with information from Oracle Mission Control showing those
> ActiveMQTextMessage instances that are never released from memory:
>
> http://goo.gl/EjEixV
>
>
> Cheers
> Klaus
>