You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Olivier Mallassi <ol...@gmail.com> on 2015/05/20 19:41:43 UTC

[Java Broker] Message Grouping

Hello all

a question regarding message grouping.

In my case, I use .....x-declare: {arguments:
{'qpid.group_header_key':'JMSXGroupID', 'qpid.shared_msg_group':0}}}}

so, looking at the doc (
https://qpid.apache.org/releases/qpid-0.32/java-broker/book/Java-Broker-Management-Managing-Queues.html
)


>
> *Enforce consumption ordering among messages belonging to the same group.
> Consumption ordering means one of two things depending on how the queue has
> been configured.In default mode, a group gets assigned to a single consumer
> for the lifetime of that consumer, and the broker will pass all subsequent
> messages in the group to that consumer.In 'shared groups' mode (which gives
> the same behaviour as the Qpid C++ Broker) the broker enforces a looser
> guarantee, namely that all the currently unacknowledged messages in a group
> are sent to the same consumer, but the consumer used may change over time
> even if the consumers do not. This means that only one consumer can be
> processing messages from a particular group at any given time, however if
> the consumer acknowledges all of its acquired messages then the broker may
> pass the next pending message in that group to a different consumer.*


I am currently using the default mode, to I have the guarantee that given a
group value, as long as the consumer is there, message will be routed to
the same consumer. Am I right?
Has someone a use case for 'shared groups'?


Looking at the code (If I understand well), it looks like we use this class
DefinedGroupMessageGroupManager where there is a _groupMap HashMap
attribute.

Does it mean that if I have one million different groupId, I will end up
with 1 million entries in that Map?  (which is not an issue but good to
know)

Can someone help me finding the classes that dispatch the 'old groupId' to
a new consumer when a consumer is closed?

Regards.

Re: [Java Broker] Message Grouping

Posted by Olivier Mallassi <ol...@gmail.com>.
awesome! thx a lot.
I missed the fact that you "simply" clearAssignement (and not reallocate)
when a consumer leaves.
The reallocation to a new consumer will then be done, if needed, in the
acceptMessage().

Thx.

On Wed, May 20, 2015 at 9:32 PM, Rob Godfrey <ro...@gmail.com>
wrote:

> On 20 May 2015 at 19:41, Olivier Mallassi <ol...@gmail.com>
> wrote:
> > Hello all
> >
> > a question regarding message grouping.
> >
> > In my case, I use .....x-declare: {arguments:
> > {'qpid.group_header_key':'JMSXGroupID', 'qpid.shared_msg_group':0}}}}
> >
> > so, looking at the doc (
> >
> https://qpid.apache.org/releases/qpid-0.32/java-broker/book/Java-Broker-Management-Managing-Queues.html
> > )
> >
> >
> >>
> >> *Enforce consumption ordering among messages belonging to the same
> group.
> >> Consumption ordering means one of two things depending on how the queue
> has
> >> been configured.In default mode, a group gets assigned to a single
> consumer
> >> for the lifetime of that consumer, and the broker will pass all
> subsequent
> >> messages in the group to that consumer.In 'shared groups' mode (which
> gives
> >> the same behaviour as the Qpid C++ Broker) the broker enforces a looser
> >> guarantee, namely that all the currently unacknowledged messages in a
> group
> >> are sent to the same consumer, but the consumer used may change over
> time
> >> even if the consumers do not. This means that only one consumer can be
> >> processing messages from a particular group at any given time, however
> if
> >> the consumer acknowledges all of its acquired messages then the broker
> may
> >> pass the next pending message in that group to a different consumer.*
> >
> >
> > I am currently using the default mode, to I have the guarantee that
> given a
> > group value, as long as the consumer is there, message will be routed to
> > the same consumer. Am I right?
>
> Correct.
>
> > Has someone a use case for 'shared groups'?
> >
>
> So, as above, the "shared groups" is the way that the C++ Broker
> implements message grouping... essentially the use case there is that
> no two messages from the same group can be processed concurrently by
> different consumers.  I have seen use cases for this in the past where
> there wasn't actually any need to tie messages from a particular group
> to a single consumer instance (no state was retained in the consumer)
> but that it was required that messages from the same group were
> processed sequentially - this type of message grouping provides this
> functionality.
>
> >
> > Looking at the code (If I understand well), it looks like we use this
> class
> > DefinedGroupMessageGroupManager where there is a _groupMap HashMap
> > attribute.
> >
> > Does it mean that if I have one million different groupId, I will end up
> > with 1 million entries in that Map?  (which is not an issue but good to
> > know)
> >
>
> So, IIRC, DefinedGroupMessageGroupManager is the "shared groups" case
> - which does have an unbounded map.
>
> For the non-shared group case it uses
> AssignedConsumerMessageGroupManager which does bound the size of the
> group -> consumer map using the queue property maximumDistinctGroups.
> It doesn't look like you can set that if you create the queue via the
> declare args in the client though - you'd have to either use the REST
> API or manually create through the UI (which is really just anther way
> of calling the REST API).  The default number of distinct groups is
> 255.  You can change this broker wide by setting the context value
> queue.maximumDistinctGroups.  You can do this through the Management
> UI, or you could just run with -Dqueue.maximumDistinctGroups=1023 or
> whatever value you like (it rounds up to the next 2^x -1).
>
> > Can someone help me finding the classes that dispatch the 'old groupId'
> to
> > a new consumer when a consumer is closed?
>
> So in the shared groups case the group is closed when there is no
> message of the group currently being processed by a consumer... this
> is managed by the subtract() method of the inner class Group in
> DefinedGroupMessageGroupManager.
>
> In the non-shared group case
>
> void unregisterConsumer(final QueueConsumerImpl consumer) in
> AbstractQueue is called when the consumer is closed.  This then calls
>
> resetSubPointersForGroups(consumer, true);
>
> which in turn calls
>
> _messageGroupManager.clearAssignments(consumer);
>
> which for the AssignedConsumerMessageGroupManager looks like this:
>
> public void clearAssignments(QueueConsumer<?> sub)
> {
>     Iterator<QueueConsumer<?>> subIter = _groupMap.values().iterator();
>     while(subIter.hasNext())
>     {
>         if(subIter.next() == sub)
>         {
>             subIter.remove();
>         }
>     }
> }
>
> which basically iterates over all the groups and if the are assigned
> to that consumer removes that assignment.
>
>
> Hope this helps,
> Rob
>
>
> >
> > Regards.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Re: [Java Broker] Message Grouping

Posted by Rob Godfrey <ro...@gmail.com>.
On 20 May 2015 at 19:41, Olivier Mallassi <ol...@gmail.com> wrote:
> Hello all
>
> a question regarding message grouping.
>
> In my case, I use .....x-declare: {arguments:
> {'qpid.group_header_key':'JMSXGroupID', 'qpid.shared_msg_group':0}}}}
>
> so, looking at the doc (
> https://qpid.apache.org/releases/qpid-0.32/java-broker/book/Java-Broker-Management-Managing-Queues.html
> )
>
>
>>
>> *Enforce consumption ordering among messages belonging to the same group.
>> Consumption ordering means one of two things depending on how the queue has
>> been configured.In default mode, a group gets assigned to a single consumer
>> for the lifetime of that consumer, and the broker will pass all subsequent
>> messages in the group to that consumer.In 'shared groups' mode (which gives
>> the same behaviour as the Qpid C++ Broker) the broker enforces a looser
>> guarantee, namely that all the currently unacknowledged messages in a group
>> are sent to the same consumer, but the consumer used may change over time
>> even if the consumers do not. This means that only one consumer can be
>> processing messages from a particular group at any given time, however if
>> the consumer acknowledges all of its acquired messages then the broker may
>> pass the next pending message in that group to a different consumer.*
>
>
> I am currently using the default mode, to I have the guarantee that given a
> group value, as long as the consumer is there, message will be routed to
> the same consumer. Am I right?

Correct.

> Has someone a use case for 'shared groups'?
>

So, as above, the "shared groups" is the way that the C++ Broker
implements message grouping... essentially the use case there is that
no two messages from the same group can be processed concurrently by
different consumers.  I have seen use cases for this in the past where
there wasn't actually any need to tie messages from a particular group
to a single consumer instance (no state was retained in the consumer)
but that it was required that messages from the same group were
processed sequentially - this type of message grouping provides this
functionality.

>
> Looking at the code (If I understand well), it looks like we use this class
> DefinedGroupMessageGroupManager where there is a _groupMap HashMap
> attribute.
>
> Does it mean that if I have one million different groupId, I will end up
> with 1 million entries in that Map?  (which is not an issue but good to
> know)
>

So, IIRC, DefinedGroupMessageGroupManager is the "shared groups" case
- which does have an unbounded map.

For the non-shared group case it uses
AssignedConsumerMessageGroupManager which does bound the size of the
group -> consumer map using the queue property maximumDistinctGroups.
It doesn't look like you can set that if you create the queue via the
declare args in the client though - you'd have to either use the REST
API or manually create through the UI (which is really just anther way
of calling the REST API).  The default number of distinct groups is
255.  You can change this broker wide by setting the context value
queue.maximumDistinctGroups.  You can do this through the Management
UI, or you could just run with -Dqueue.maximumDistinctGroups=1023 or
whatever value you like (it rounds up to the next 2^x -1).

> Can someone help me finding the classes that dispatch the 'old groupId' to
> a new consumer when a consumer is closed?

So in the shared groups case the group is closed when there is no
message of the group currently being processed by a consumer... this
is managed by the subtract() method of the inner class Group in
DefinedGroupMessageGroupManager.

In the non-shared group case

void unregisterConsumer(final QueueConsumerImpl consumer) in
AbstractQueue is called when the consumer is closed.  This then calls

resetSubPointersForGroups(consumer, true);

which in turn calls

_messageGroupManager.clearAssignments(consumer);

which for the AssignedConsumerMessageGroupManager looks like this:

public void clearAssignments(QueueConsumer<?> sub)
{
    Iterator<QueueConsumer<?>> subIter = _groupMap.values().iterator();
    while(subIter.hasNext())
    {
        if(subIter.next() == sub)
        {
            subIter.remove();
        }
    }
}

which basically iterates over all the groups and if the are assigned
to that consumer removes that assignment.


Hope this helps,
Rob


>
> Regards.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org