You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Kevin Burton <bu...@spinn3r.com> on 2016/01/24 21:16:45 UTC

Message group checkpoints...(processing N messages with one task).

I have a pattern which I think I need advice on...

I have three tasks... each a type of message consumer.

Let's call them A, B, an C.

A runs once,, creates 15 messages, sends them to B... then B process these
messages then generates 15 new messages.

However, they need to be combined into a group, and sent to C all at once,
in one composite message.

So its similar to map/reduce in a way in that C should execute once with a
block of these 15 messages.

Conceptually I'm calling them (message group checkpoints).. but I'm
wondering if there's already a more formal name for this concept.

I'm not sure the best way to handle this with ActiveMQ.

One strategy is that I could have one queue per C tasks (the final tasks)
and then have C running and consuming them one at a time, and then
performing the execution (and message commit) once it receives all 15
messages.

I HAVE to get all the messages until I can make a decision, I can't stream
process them unfortunately because the algorithm needs all data points.

I could use a database.. but the problem there is that it would incur some
database and Cassandra (in our case) doesn't handle this queue pattern very
well.

Another idea is to use a series of queues ... say aggregation_0,
aggregation_1, aggregation_2,...

then I receive these messages into the first queue (aggregation_0), then
sort it, if any of the groups are finished I send them on to the final
destination task.  If any are unfinished then I overflow them on to
aggregation1 (pre-sorted)...

Thoughts?

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>

Re: Message group checkpoints...(processing N messages with one task).

Posted by Tim Bain <tb...@alumni.duke.edu>.
The issue with selectors is because the broker can't do random access into
a persistent store, so there's a cursor that reads the earliest N
messages.  Within that (large) batch, things like priorities, selectors,
and message groups are respected.  But if the cursor is full of messages of
one type but the full store contains messages of other types, the other
messages can't be evaluated until enough messages of the first type are
consumed to cause the other messages to be read into the cursor (because
they're now in the first N).  What's implemented is a trade-off, allowing
the broker to only read each message once and linearly (which will be more
efficient for some message stores) but ultimately causing more problems
than I think the performance and the simplicity are worth.  One partial
workaround is to make the cursor bigger, but since it still has to fit in
memory it's not a full cure.  The other is to have enough consumers who are
online at any time to ensure that messages never back up beyond what the
cursor can hold, which is possible for the use cases you've previously
described but might cost you money for more hardware.

Using message groups will probably be more flexible than using hard-coded
buckets, though the buckets will mean each cursor is less likely to get
full of messages for only a few consumers.  Also, if you search this list
for threads about message groups you'll see a feature suggestion for
rebalancing groups across consumers which might be important at the volumes
you typically handle.  I don't think anyone has tackled it yet, though if
you discover you have to have it and you implement something and are
willing to contribute it back, we'd gladly take it.

Rolling back your messages till you either have a full batch or you DLQ
them seems like not an ideal solution.  You'll process the message
repeatedly (so it's inefficient) and your decision to give up is based on
an arbitrary number of attempts, not whether you have space to keep holding
the message, how much time has elapsed, etc.  Better to consume once but
not ack till the whole batch has arrived.  The only problem with
INDIVIDUAL_ACK is that there is no way to nack a message individually, so
your only option is to roll back the consumer and nack everything all at
once, including stuff you don't actually want to nack.  I submitted an
enhancement request for individual nacks (
https://issues.apache.org/jira/browse/AMQ-6098,
http://activemq.2283324.n4.nabble.com/How-to-use-INDIVIDUAL-ACKNOWLEDGE-mode-td4704793.html),
but Tim Bish was unconvinced that we should implement the capability on the
grounds that it's not required by the JMS spec (we're allowed to extend the
spec in non-standard ways, so that's not a convincing argument) and to the
best of my knowledge no one (him, me, or anyone else) has made any attempt
to implement it since.  But with that feature implemented, I think
INDIVIDUAL_ACK plus message groups plus enough consumers to keep all
messages in memory is the way to go; without it, things get much harder,
and maybe you need one destination per message group with a Camel route to
pull messages from a queue and move them to the right per-group destination?

Tim

Kevin Burton wrote:

I think part of this is that I'm unsure how some of the ActiveMQ internals
work here.

For example, I've run into problems with message selectors where they don't
operate properly if one message subset produces fewer messages than the
other message subsets due to the way the 'head of queue' system works in
ActiveMQ.  the memory gets choked out and so you end up with some subset of
your messages having higher latencies.

Another thing I was considering was building sort of a bucket system
similar to the way storm operates or DHTs operate where we pre-allocate
like 20k 'buckets' and put messages into these buckets.  This way a
consumer only sees the buckets its responsible and some smaller amount of
volume.  It can then group the messages in bulk when it receives them all.

Another way to handle the incomple groups, IE if some of them get dropped
.. is to keep rolling them back until they go into the DLQ...

On Sun, Jan 24, 2016 at 2:06 PM, Tim Bain <tb...@alumni.duke.edu> wrote:

> I'd use message groups for the messages produced by A, ensuring that all
15
> go to a single B consumer.  Then B can use CLIENT_ACK or INDIVIDUAL_ACK
> mode to only ack the messages when all 15 have been received (holding them
> in memory and unacknowledged until all 15 arrive), and then B can publish
a
> single message to be consumed by C that encompasses all of the information
> from the 15 messages.  This only requires two queues (AB for messages
> between A and B, and BC for messages between B and C), so it'll be simpler
> to manage than having multiple aggregation_N queues.
>
> You'll want to consider what happens if you never get all 15 messages (I'd
> eventually write them into a database so the batch can be reassembled
later
> if they eventually show up), as well as how to do your acking if a single
B
> consumer can get interleaved message groups (which is why I'd suggest
> considering INDIVIDUAL_ACK mode).  Also, although I agree that doing all
of
> this in a database isn't likely to be performant, I think any database
> (Cassandra, Oracle, whatever) would be able to handle just your
> incomplete-groups use case, since the percentage of groups that end up
> routed through that mechanism is presumably small.
>
> All of this assumes that most message groups will be completed quickly
> enough that it's reasonable to hold all messages from most open groups in
> the memory (and the prefetch buffer) of one or another of your B
consumers,
> and that the number of groups that would have to be overflowed.  If that's
> not reasonable, this gets harder, but you could potentially write the
> messages to some type of database but have the consumer just keep track of
> which messages it has (so you're storing a single bit-mapped integer for
> each message group instead of storing the full message content), and then
> read them back in when the 15th one arrives.  That means the database
> doesn't have to query to know which batches are complete, only be capable
> of retrieving the messages for a single group on demand.
>
> Tim
>
> On Sun, Jan 24, 2016 at 1:16 PM, Kevin Burton <bu...@spinn3r.com> wrote:
>
> > I have a pattern which I think I need advice on...
> >
> > I have three tasks... each a type of message consumer.
> >
> > Let's call them A, B, an C.
> >
> > A runs once,, creates 15 messages, sends them to B... then B process
> these
> > messages then generates 15 new messages.
> >
> > However, they need to be combined into a group, and sent to C all at
> once,
> > in one composite message.
> >
> > So its similar to map/reduce in a way in that C should execute once with
> a
> > block of these 15 messages.
> >
> > Conceptually I'm calling them (message group checkpoints).. but I'm
> > wondering if there's already a more formal name for this concept.
> >
> > I'm not sure the best way to handle this with ActiveMQ.
> >
> > One strategy is that I could have one queue per C tasks (the final
tasks)
> > and then have C running and consuming them one at a time, and then
> > performing the execution (and message commit) once it receives all 15
> > messages.
> >
> > I HAVE to get all the messages until I can make a decision, I can't
> stream
> > process them unfortunately because the algorithm needs all data points.
> >
> > I could use a database.. but the problem there is that it would incur
> some
> > database and Cassandra (in our case) doesn't handle this queue pattern
> very
> > well.
> >
> > Another idea is to use a series of queues ... say aggregation_0,
> > aggregation_1, aggregation_2,...
> >
> > then I receive these messages into the first queue (aggregation_0), then
> > sort it, if any of the groups are finished I send them on to the final
> > destination task.  If any are unfinished then I overflow them on to
> > aggregation1 (pre-sorted)...
> >
> > Thoughts?
> >
> > --
> >
> > We’re hiring if you know of any awesome Java Devops or Linux Operations
> > Engineers!
> >
> > Founder/CEO Spinn3r.com
> > Location: *San Francisco, CA*
> > blog: http://burtonator.wordpress.com
> > … or check out my Google+ profile
> > <https://plus.google.com/102718274791889610666/posts>
> >
>

--

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>

Re: Message group checkpoints...(processing N messages with one task).

Posted by Kevin Burton <bu...@spinn3r.com>.
I think part of this is that I'm unsure how some of the ActiveMQ internals
work here.

For example, I've run into problems with message selectors where they don't
operate properly if one message subset produces fewer messages than the
other message subsets due to the way the 'head of queue' system works in
ActiveMQ.  the memory gets choked out and so you end up with some subset of
your messages having higher latencies.

Another thing I was considering was building sort of a bucket system
similar to the way storm operates or DHTs operate where we pre-allocate
like 20k 'buckets' and put messages into these buckets.  This way a
consumer only sees the buckets its responsible and some smaller amount of
volume.  It can then group the messages in bulk when it receives them all.

Another way to handle the incomple groups, IE if some of them get dropped
.. is to keep rolling them back until they go into the DLQ...

On Sun, Jan 24, 2016 at 2:06 PM, Tim Bain <tb...@alumni.duke.edu> wrote:

> I'd use message groups for the messages produced by A, ensuring that all 15
> go to a single B consumer.  Then B can use CLIENT_ACK or INDIVIDUAL_ACK
> mode to only ack the messages when all 15 have been received (holding them
> in memory and unacknowledged until all 15 arrive), and then B can publish a
> single message to be consumed by C that encompasses all of the information
> from the 15 messages.  This only requires two queues (AB for messages
> between A and B, and BC for messages between B and C), so it'll be simpler
> to manage than having multiple aggregation_N queues.
>
> You'll want to consider what happens if you never get all 15 messages (I'd
> eventually write them into a database so the batch can be reassembled later
> if they eventually show up), as well as how to do your acking if a single B
> consumer can get interleaved message groups (which is why I'd suggest
> considering INDIVIDUAL_ACK mode).  Also, although I agree that doing all of
> this in a database isn't likely to be performant, I think any database
> (Cassandra, Oracle, whatever) would be able to handle just your
> incomplete-groups use case, since the percentage of groups that end up
> routed through that mechanism is presumably small.
>
> All of this assumes that most message groups will be completed quickly
> enough that it's reasonable to hold all messages from most open groups in
> the memory (and the prefetch buffer) of one or another of your B consumers,
> and that the number of groups that would have to be overflowed.  If that's
> not reasonable, this gets harder, but you could potentially write the
> messages to some type of database but have the consumer just keep track of
> which messages it has (so you're storing a single bit-mapped integer for
> each message group instead of storing the full message content), and then
> read them back in when the 15th one arrives.  That means the database
> doesn't have to query to know which batches are complete, only be capable
> of retrieving the messages for a single group on demand.
>
> Tim
>
> On Sun, Jan 24, 2016 at 1:16 PM, Kevin Burton <bu...@spinn3r.com> wrote:
>
> > I have a pattern which I think I need advice on...
> >
> > I have three tasks... each a type of message consumer.
> >
> > Let's call them A, B, an C.
> >
> > A runs once,, creates 15 messages, sends them to B... then B process
> these
> > messages then generates 15 new messages.
> >
> > However, they need to be combined into a group, and sent to C all at
> once,
> > in one composite message.
> >
> > So its similar to map/reduce in a way in that C should execute once with
> a
> > block of these 15 messages.
> >
> > Conceptually I'm calling them (message group checkpoints).. but I'm
> > wondering if there's already a more formal name for this concept.
> >
> > I'm not sure the best way to handle this with ActiveMQ.
> >
> > One strategy is that I could have one queue per C tasks (the final tasks)
> > and then have C running and consuming them one at a time, and then
> > performing the execution (and message commit) once it receives all 15
> > messages.
> >
> > I HAVE to get all the messages until I can make a decision, I can't
> stream
> > process them unfortunately because the algorithm needs all data points.
> >
> > I could use a database.. but the problem there is that it would incur
> some
> > database and Cassandra (in our case) doesn't handle this queue pattern
> very
> > well.
> >
> > Another idea is to use a series of queues ... say aggregation_0,
> > aggregation_1, aggregation_2,...
> >
> > then I receive these messages into the first queue (aggregation_0), then
> > sort it, if any of the groups are finished I send them on to the final
> > destination task.  If any are unfinished then I overflow them on to
> > aggregation1 (pre-sorted)...
> >
> > Thoughts?
> >
> > --
> >
> > We’re hiring if you know of any awesome Java Devops or Linux Operations
> > Engineers!
> >
> > Founder/CEO Spinn3r.com
> > Location: *San Francisco, CA*
> > blog: http://burtonator.wordpress.com
> > … or check out my Google+ profile
> > <https://plus.google.com/102718274791889610666/posts>
> >
>



-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>

Re: Message group checkpoints...(processing N messages with one task).

Posted by Tim Bain <tb...@alumni.duke.edu>.
I'd use message groups for the messages produced by A, ensuring that all 15
go to a single B consumer.  Then B can use CLIENT_ACK or INDIVIDUAL_ACK
mode to only ack the messages when all 15 have been received (holding them
in memory and unacknowledged until all 15 arrive), and then B can publish a
single message to be consumed by C that encompasses all of the information
from the 15 messages.  This only requires two queues (AB for messages
between A and B, and BC for messages between B and C), so it'll be simpler
to manage than having multiple aggregation_N queues.

You'll want to consider what happens if you never get all 15 messages (I'd
eventually write them into a database so the batch can be reassembled later
if they eventually show up), as well as how to do your acking if a single B
consumer can get interleaved message groups (which is why I'd suggest
considering INDIVIDUAL_ACK mode).  Also, although I agree that doing all of
this in a database isn't likely to be performant, I think any database
(Cassandra, Oracle, whatever) would be able to handle just your
incomplete-groups use case, since the percentage of groups that end up
routed through that mechanism is presumably small.

All of this assumes that most message groups will be completed quickly
enough that it's reasonable to hold all messages from most open groups in
the memory (and the prefetch buffer) of one or another of your B consumers,
and that the number of groups that would have to be overflowed.  If that's
not reasonable, this gets harder, but you could potentially write the
messages to some type of database but have the consumer just keep track of
which messages it has (so you're storing a single bit-mapped integer for
each message group instead of storing the full message content), and then
read them back in when the 15th one arrives.  That means the database
doesn't have to query to know which batches are complete, only be capable
of retrieving the messages for a single group on demand.

Tim

On Sun, Jan 24, 2016 at 1:16 PM, Kevin Burton <bu...@spinn3r.com> wrote:

> I have a pattern which I think I need advice on...
>
> I have three tasks... each a type of message consumer.
>
> Let's call them A, B, an C.
>
> A runs once,, creates 15 messages, sends them to B... then B process these
> messages then generates 15 new messages.
>
> However, they need to be combined into a group, and sent to C all at once,
> in one composite message.
>
> So its similar to map/reduce in a way in that C should execute once with a
> block of these 15 messages.
>
> Conceptually I'm calling them (message group checkpoints).. but I'm
> wondering if there's already a more formal name for this concept.
>
> I'm not sure the best way to handle this with ActiveMQ.
>
> One strategy is that I could have one queue per C tasks (the final tasks)
> and then have C running and consuming them one at a time, and then
> performing the execution (and message commit) once it receives all 15
> messages.
>
> I HAVE to get all the messages until I can make a decision, I can't stream
> process them unfortunately because the algorithm needs all data points.
>
> I could use a database.. but the problem there is that it would incur some
> database and Cassandra (in our case) doesn't handle this queue pattern very
> well.
>
> Another idea is to use a series of queues ... say aggregation_0,
> aggregation_1, aggregation_2,...
>
> then I receive these messages into the first queue (aggregation_0), then
> sort it, if any of the groups are finished I send them on to the final
> destination task.  If any are unfinished then I overflow them on to
> aggregation1 (pre-sorted)...
>
> Thoughts?
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> <https://plus.google.com/102718274791889610666/posts>
>