You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Robert Godfrey <ro...@gmail.com> on 2011/06/27 18:25:18 UTC

Re: Proposed QIP: Support for Message Grouping in the broker.

So, in general this looks good, and is something we could support in the
Java Broker... My one comment would be that I think the more standard
behaviour for a "Group" is to ensure all messages in the same group are
delivered (in order) to the same consumer, with the notion of blocking
delivery of the nth + 1 element of the group until the nth element is
acknowledged seeming more like an alternate behaviour.

JMS (half) specifies two message properties for group ID and group Sequence
Number... AMQP v1.0 is defining something along the same lines (and as such,
making the attribute containing the group id configurable would make less
sense in an AMQP v1.0 context).  There are some interesting potential
behaviours there for a queue to ensure that messages within a group are not
forwarded "out of order" or with gaps in the sequence...

On the Java side I was planning to implement the more standard Group
behaviour (i.e. messages in a group are all sent to the same consumer), but
could extend that to also do the "don't send until the last message has been
consumed" feature also... we should just work out a clear way of defining
these features and allowing our users to correctly specify which behaviour
it is they desire

Cheers,
Rob

On 24 June 2011 17:07, Ken Giusti <kg...@redhat.com> wrote:

>
> Message Groups
>
> Status
>
> Draft
>
> Summary
>
> This document describes a new feature that would allow a message producer
> to enforce the order in which the data from a set of related messages are
> processed by consumers.
>
> Problem
>
> While the broker currently employs an strict FIFO queuing model, it does
> not guarantee that messages will be processed in that order when there are
> multiple consumers subscribed to a queue. Therefore, it is impossible for a
> producer to enforce a strict ordering to the processing of messages, even
> though it may be required by the application.
>
> For example, assume we have a shopping application that manages items in a
> virtual shopping cart. A user may add an item to their shopping cart, then
> change their mind and remove it. If the application sends an "add" message
> to the broker, immediately followed by a "remove" message, they will be
> queued in the proper order - "add", then "remove".
>
> However, if there are multiple consumers, it is possible that once a
> consumer acquires the "add" message, a different consumer may acquire the
> "remove" message. This allows both messages to be processed in parallel,
> which could result in the "remove" operation being performed before the
> "add" operation.
> Solution
>
> This problem can be solved by allowing a producer to mark a group of
> messages as being related, and having the broker enforce strict ordering of
> consumption of messages belonging to that group.
>
> Specifically, for any given group of queued messages, the broker allows
> only the first message in the group be available for consumption by a
> subscriber. The broker blocks the remaining messages belonging to that group
> from consumption by any subscriber. Once the broker has completed the
> transfered of responsibility for that first message to a consumer, it then
> allows the next message in the group to be available for consumption while
> all other messages in that group remain blocked.
>
> In order to guarantee messages are not processed in parallel, the
> application has to ensure that it has completely processed the data in a
> received message before accepting that message, as described in Section
> 2.6.2. Transfer of Responsibility, of the AMQP-0.10 specification.
>
> Note well that distinct message groups would not block each other. For
> example, assume a queue contains messages from two different message groups
> - say group "A" and group "B" - and they are enqueued such that "A"'s
> messages are in front of "B". If the first message of group "A" is in the
> process of being consumed, then the remaining "A" messages are blocked, but
> the first message of the "B" group is available for consumption - even
> though it is "behind" group "A" in the queue.
>
> This feature could be implemented by having the message producer set a
> group identifier in the application headers of a message. Messages belonging
> to the same group would have the same identifier value. The destination
> queue would be configured with the key name of the application header used
> for group identification. When a message is delivered to the queue, the
> broker would check the application headers for the group identification key.
> It would then classify which group the message belonged to based on the
> value of the key. The queue subscriber logic on the broker would need to be
> modified to enforce the rules for accessing grouped messages (as described
> above).
>
> Rationale
>
> The solution described above allows the message producer to enforce a
> strict order to the processing of its messages by the consumers. The broker
> can support such a feature with a minimum of configuration effort (merely
> configuring the header key used per queue).
>
>    * Goal: allow dynamic values for the group identifiers (no
> preconfiguration of identifiers necessary)
>    * Goal: the number of message groups "in flight" should only be limited
> by the available resources (no need to configure a hard limit).
>    * Goal: the individual messages from a single message group may be
> processed by different consumers, as long as the strict order is observed.
>    * Goal: manageability: visibility into the message groups currently on a
> given queue
>    * Goal: manageability: purge, move, reroute messages at the group level.
>    * Goal: "sticky" subscribers - (optional) ensure that all messages from
> a given group are consumed by the same client.
>
> Implementation Notes
>
>    * Queues: support configuration of the group identifier header key.
>    * Messages: provide access to group identifier.
>    * Queues: identify head message of next available message group.
>    * Queues: block the trailing messages in a given message group from
> being consumed.
>    * Consumers: track the message group of the currently acquired
> message(s).
>
> Consequences
>
>    * Development: No changes to the development process.
>    * Release: No changes to the release process.
>    * Documentation: User documentation will be needed to explain the
> feature, and the steps to configure and manage the new feature.
>    * Configuration: Yes: per-queue group identifier header key is
> configurable. Queue state with regard to in-flight message groups needs to
> be visible. Additional methods to purge/move/reroute a message group.
>    * Compatibility: Unlikely - new feature that would need to be enabled
> manually. Applications wishing to use the feature would need to implement a
> message grouping policy, and ensure the processing of received message data
> is completed before signaling acceptance of the message.
>
> References
>
>    * None.
>
> Contributor-in-Charge
>
> Kenneth Giusti, kgiusti@apache.org
> Contributors
>
>    * Ted Ross, tross@redhat.com
>
> Version
>
> 0.1
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
>

Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 06/27/2011 12:41 PM, Gordon Sim wrote:
> On 06/27/2011 05:25 PM, Robert Godfrey wrote:
>> So, in general this looks good, and is something we could support in the
>> Java Broker... My one comment would be that I think the more standard
>> behaviour for a "Group" is to ensure all messages in the same group are
>> delivered (in order) to the same consumer, with the notion of blocking
>> delivery of the nth + 1 element of the group until the nth element is
>> acknowledged seeming more like an alternate behaviour.
>
> I don't see a lot of value in blocking delivery of the nth +1 element until the
> nth element is acknowledged. I think the important thing is to deliver the
> messages within a group in order in spite of having competing consumers. You can
> do that simply by ensuring that the nth + 1 message here goes to the same
> consumer as the nth message.
>
> I agree that a common approach is for a group to be 'sticky' to a given consumer.
>
> However I can also see value in a scheme where that stickiness ends not when the
> consumer is closed, but when there are no outstanding in-doubt messages for the
> group. In the case where the requirement is simply in-order processing of
> messages in a group this might allow for more adaptive load balancing across
> parallel consumers.


The sticky approach seems more intuitively obvious to me, but that makes me 
wonder how the non-sticky alternative ever came up in the first place if it's 
not a requirement for somebody. E.g. weblogic 
http://download.oracle.com/docs/cd/E12840_01/wls/docs103/jms/uoo.html phrases 
this in a non-sticky way. Of course the sticky approach is a legitimate 
implementation since "will be sent to any consumer" is not violated if you 
happen to keep choosing the same consumer, but I wonder if the load balancing is 
something that some users would want.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/28/2011 11:56 AM, Alan Conway wrote:
> What happens if the consumer dies or cancel's mid-group in this case? Do we
> - replay the group from the beginning to another consumer?
> - drop the rest of the group?
> - continue sending to a new consumer from where the first consumer left
> off?
> - In which case do we provide a flag to let the new consumer know it's
> starting mid-group?

I would say it is business as usual. Any unacknowledged messages are 
redelivered to another available consumer. However the relative ordering 
between the messages in a group should not be lost.

In case (3) the assumption is that the application will only acknowledge 
after receiving the final message in the group, so the group will be 
acknowledged as a group.

At present the c++ broker does not support consumer identity that 
persists across loss of session. That I think is orthogonal to this 
however. (If it did then under requirement 2 you would maintain the 
association of group to consumer identity and not redeliver those 
messages until the consumer re-subscribed).

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 06/28/2011 06:39 AM, Gordon Sim wrote:
> On 06/28/2011 09:57 AM, Marnie McCormack wrote:
>> In JMS, the sequence id would be used to maintain order, but the expectation
>> is that the group are processed as a whole on receipt of the final message
>> in the group rather than simply shared out between consumers and processed
>> in order.
>
> I think that is a third variation (further restriction really) on the use case.
> I.e. where message group is used to piece together a large unit from discrete
> messages.
>
> So I think the requirements are:
>
> (1) message group must always be processed in order
>
> (2) all messages in a group are processed by the same consumer
>
> (3) the consumer will only complete processing having received the 'final'
> message in the group
>

What happens if the consumer dies or cancel's mid-group in this case? Do we
- replay the group from the beginning to another consumer?
- drop the rest of the group?
- continue sending to a new consumer from where the first consumer left off?
   - In which case do we provide a flag to let the new consumer know it's 
starting mid-group?

> Where (3) implies (2) and (2) implies (1).
>
> The third requirement essentially rules out an implementation of the first
> requirement that refuses to deliver the next message in the group until the
> previous message is accepted.
>
> The second requirement is important where there is a desire to contain all the
> necessary state for processing a group of messages within a single consumer. I
> agree that is important and should be supported. However I think it is not
> always required and it should be possible to drop that if not needed as that
> could lead to more adaptive load-balancing while meeting the first requirement.
>
> So I think the configuration involved is:
>
> (a) identification of the header that signifies a group within a given queue,
> which implicitly enables requirement (1)
>
> (b) an indication that 'stickiness' of group to consumer is required (beyond
> simply meeting the first requirement)
>
> I think we can avoid the need for any explicit configuration of requirement (3).
> Refusing to deliver the next message in the group until the previous message is
> accepted is in my view unnecessary to meet (1) and could be emulated more or
> less by controlling prefetch.
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by mick <mg...@redhat.com>.
On Tue, 2011-06-28 at 09:34 -0400, Ken Giusti wrote:


> 1) I'd like to rename the QIP - is there a better term to use for the "non-sticky" case than "Message Groups"?
> 

If "Message Group" implies stickyness, how about "Message Thread" ?   
That implies only ordering -- and may or may not be stuck to one
consumer.





---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 06/28/2011 04:10 PM, mick wrote:
> On Tue, 2011-06-28 at 15:51 +0100, Gordon Sim wrote:
>
>>
>> E.g. imagine the group relates to some real world object being
>> modelled
>> and each message contains describes an update
>
>
> To me that situation seems like it should be modeled as a queue.
>
> I think it would be very worthwhile in this discussion to figure out at
> what point we feel that a customer ought to opt for a queue rather than
> a message group.
>
>

I think the significant difference is addressing. Consumers can subscribe to a 
single queue and process multiple message groups _without knowing the names of 
all the message groups in advance_. A multiple queue approach requires that the 
consumers know the names of all the queues they might be interested in. Of 
course we could use multiple queues internally in the implementation but the 
consumer wants to see just one named entity it can subscribe to for all the 
traffic it cares about.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 07/25/2011 10:03 AM, Gordon Sim wrote:
> On 07/25/2011 02:46 PM, Alan Conway wrote:
>> Response in line
>>
>> [snip]
>>> Ah, yes - sorry. The lifetime of the group's state depends on the type
>>> of "Policy" that is being used (see below). For the "Sequenced
>>> Consumers" policy - which is what I was thinking of in my last reply -
>>> the broker doesn't need to maintain state across all messages of the
>>> group. With that policy, the state can be dropped once there are no
>>> more messages for that group present in the broker.
>>>
>>
>> Again I don't get that. You're putting group boundaries at unpredictable
>> arbitrary points depending on relative speed of producer/consumer. The
>> producer may send the first N messages of a group containing N+M
>> messages, then the consumer consumes the first N messages. Now the queue
>> is empty so the next M messages are considered a new group, where N is
>> impossibly to predict in advance. So you randomly cut up the groups.
>
> Cutting up the groups doesn't matter though. The requirement here is only that
> the messages be processed in order.

Of course.

> We only need to hold state regarding groups in order to prevent delivery of
> messages to one consumer when earlier messages of the same group have been given
> to another consumer who has not yet indicated that they have been processed.

Right, and for those groups we can use the termination of the consumer as the 
end of the group. It all fits.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 07/25/2011 02:46 PM, Alan Conway wrote:
> Response in line
>
> [snip]
>> Ah, yes - sorry. The lifetime of the group's state depends on the type
>> of "Policy" that is being used (see below). For the "Sequenced
>> Consumers" policy - which is what I was thinking of in my last reply -
>> the broker doesn't need to maintain state across all messages of the
>> group. With that policy, the state can be dropped once there are no
>> more messages for that group present in the broker.
>>
>
> Again I don't get that. You're putting group boundaries at unpredictable
> arbitrary points depending on relative speed of producer/consumer. The
> producer may send the first N messages of a group containing N+M
> messages, then the consumer consumes the first N messages. Now the queue
> is empty so the next M messages are considered a new group, where N is
> impossibly to predict in advance. So you randomly cut up the groups.

Cutting up the groups doesn't matter though. The requirement here is 
only that the messages be processed in order.

We only need to hold state regarding groups in order to prevent delivery 
of messages to one consumer when earlier messages of the same group have 
been given to another consumer who has not yet indicated that they have 
been processed.

>> For the other proposed policy - Exclusive Consumer - then the group
>> state would be associated with a particular Consumer instance, and
>> remain present as long as the Consumer exists. So in that case, yes -
>> there is the potential that an unbounded number of group states could
>> exist without some kind of reclaim strategy. And end-of-group marker
>> could be used, but that wouldn't prevent a DOS by a misbehaving app
>> that creates endless groups w/o setting the end marker. Perhaps a hard
>> max-groups-per-Consumer limit, or a TTL for idle groups?
>
> I'm not so worried about that, it's no worse than the fact that a client
> can create a huge number of of sessions or consumers etc. Solving that
> family of DOS attacks is a separate issue I think.

I agree. I'd leave worrying about this for a later stage.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
Response in line

[snip]
>>>
>>> In this case, it probably would be sufficient to maintain a counter
>>> in the group's state. I'm assuming that we'll need a group lookup on
>>> message arrival to find that state (or create it on the first
>>> received message). Increment the counter when a message in that
>>> group is enqueued, decrement when the message is dequeued. If
>>> decremented to zero, delete the group. Performance impact of such an
>>> approach is TBD, of course - probably need to pool the state objects
>>> to limit memory trashing, etc....
>>>
>>
>> That doesn't sound right. The first N messages of a group could be
>> dequeued
>> before the N+1 message is enqueued, which would put your counter to 0
>> before the
>> entire group is processed. I think we need an explicit end of group
>> marker on
>> the last message in the group to allow the user to close a group in a
>> sceanrio
>> with lots of dynamic group ids. You could ignore this in cases where
>> there is a
>> small fixed set of groups that are used over a long time.
>>
>
>
> Ah, yes - sorry.   The lifetime of the group's state depends on the type of "Policy" that is being used (see below).  For the "Sequenced Consumers" policy - which is what I was thinking of in my last reply - the broker doesn't need to maintain state across all messages of the group.  With that policy, the state can be dropped once there are no more messages for that group present in the broker.
>

Again I don't get that. You're putting group boundaries at unpredictable 
arbitrary points depending on relative speed of producer/consumer. The producer 
may send the first N messages of a group containing N+M messages, then the 
consumer consumes the first N messages. Now the queue is empty so the next M 
messages are considered a new group, where N is impossibly to predict in 
advance. So you randomly cut up the groups.

> For the other proposed policy - Exclusive Consumer - then the group state would be associated with a particular Consumer instance, and remain present as long as the Consumer exists.  So in that case, yes - there is the potential that an unbounded number of group states could exist without some kind of reclaim strategy.   And end-of-group marker could be used, but that wouldn't prevent a DOS by a misbehaving app that creates endless groups w/o setting the end marker.  Perhaps a hard max-groups-per-Consumer limit, or a TTL for idle groups?

I'm not so worried about that, it's no worse than the fact that a client can 
create a huge number of of sessions or consumers etc. Solving that family of DOS 
attacks is a separate issue I think.

>
>
>
>>>
>>>
>>>>> For #2:
>>>>>
>>>>> The key for the header that contains the group identifier would be
>>>>> provided to
>>>>> the broker via configuration.
>>>>
>>>> A fixed name like 'qpid.group' seems less likely to confuse.
>>>> Perhaps
>>>> have id
>>>> default to 'qpid.group' but be cofigurable to another name in the
>>>> (unlikely?)
>>>> event that it is required.
>>>>
>>>
>>> That's a good idea. The current AMQP 1.0 draft defines a message
>>> property header called "group-id", perhaps we should use
>>> "qpid.group-id" as the default instead of "qpid.group"?
>>
>> Should we be using the AMQP group-id? What's it for?
>>
>
>
> It appears to be for the exact same purpose - identify the group a message belongs to.  Set by the application and preserved to destination.  But I don't see any specific use cases proposed in the spec.
>
>
>>
>>>>>      From the broker's perspective, the number of different group id
>>>>>    values would be
>>>>> unlimited. And the value of group identifiers would not be
>>>>> provided
>>>>> ahead of
>>>>> time by configuration: the broker must learn them at runtime.
>>>>>
>>>>> For #3:
>>>>>
>>>>> This QIP defines two message group policies. Additional policies
>>>>> may
>>>>> be
>>>>> defined in the future.
>>>>>
>>>>> Policy 1: Exclusive Consumer
>>>>>
>>>>> With this policy, the broker would guarantee that all messages in
>>>>> a
>>>>> group
>>>>> would be delivered to the same client.
>>>>>
>>>>> This policy would be configured on a per-queue basis.
>>
>> We could configure policy on a per-group basis with the value of the
>> "qpid.group" property. E.g. qpid.group=exclusive.
>> That's more flexible and puts control of the policy in the hands of
>> the
>> producer, rather than requiring an admin convention that associates a
>> queue name
>> with a policy.
>>
>
> I'm not sure I completely understand: are you proposing the producer sets both the group identifier and a policy type identifier on a per-message basis?
>
>
>>>>>   When the first
>>>>> message
>>>>> of a new message group becomes available for delivery, the broker
>>>>> will
>>>>> associate that group with the next available consumer. The broker
>>>>> would then
>>>>> guarantee that all messages from that group are delivered to that
>>>>> consumer
>>>>> only.
>>>>>
>>>>> The broker will maintain the group/client association for the
>>>>> lifetime of the
>>>>> client. Should the client die or cancel its subscription, any
>>>>> unacknowledged
>>>>> messages in the group will be assigned to a different client
>>>>> (preserving
>>>>> message order). Group/client associations are not maintained
>>>>> across
>>>>> broker
>>>>> restart. These associations must be replicated in a clustered
>>>>> broker.
>>>>>
>>>>>
>>>>> Policy #2: Sequenced Consumers
>>>>>
>>>>> With this policy, the broker would guarantee that the order in
>>>>> which
>>>>> messages
>>>>> in a group are processed by consumers is the same order in which
>>>>> the
>>>>> messages
>>>>> where enqueued. This guarantee would be upheld even if the
>>>>> messages
>>>>> of the
>>>>> group are processed by different consumers. No two messages from
>>>>> the
>>>>> same
>>>>> group would be processed in parallel by different consumers.
>>>>>
>>>>> Specifically, for any given group, the broker allows only the
>>>>> first
>>>>> N messages
>>>>> in the group to be available for delivery to a particular
>>>>> consumer.
>>>>> The value
>>>>> of N would be determined by the selected consumer's configured
>>>>> prefetch
>>>>> capacity. The broker blocks access to the remaining messages in
>>>>> that
>>>>> group by
>>>>> any other consumer. Once the selected consumer has acknowledged
>>>>> that
>>>>> first set
>>>>> of delivered messages, the broker allows the next messages in the
>>>>> group to be
>>>>> available for delivery. The next set of messages may be delivered
>>>>> to
>>>>> a
>>>>> different consumer.
>>>>>
>>>>> This policy would be configured on a per-queue basis.
>>>>> Configuration
>>>>> would
>>>>> include designating the key of the application header that
>>>>> specifies
>>>>> the group
>>>>> id.
>>>>>
>>>>> Note will that, with this policy, the consuming application has
>>>>> to:
>>>>>
>>>>> 1. ensure that it has completely processed the data in a received
>>>>> message
>>>>> before accepting that message, as described in Section 2.6.2.
>>>>> Transfer of
>>>>> Responsibility, of the AMQP-0.10 specification.
>>>>>
>>>>> 2. ensure that messages are not selectively acknowledged or
>>>>> released
>>>>> - order
>>>>> must be preserved in both cases.
>>>>
>>>> What happens if the consuming application fails to respect these
>>>> requirements?
>>>>
>>>
>>> Hmmm... good question. My initial thoughts are that this would be
>>> considered an application error, since preserving order is important
>>> for this feature. Perhaps the broker should respond with an error
>>> exception?
>>>
>>>
>>>>
>>>>> Note well that in the case of either of these proposed policies,
>>>>> distinct
>>>>> message groups would not block each other from delivery. For
>>>>> example, assume a
>>>>> queue contains messages from two different message groups - say
>>>>> group "A" and
>>>>> group "B" - and they are enqueued such that "A"'s messages are in
>>>>> front of
>>>>> "B". If the first message of group "A" is in the process of being
>>>>> consumed by a
>>>>> client, then the remaining "A" messages are blocked, but the
>>>>> messages of the
>>>>> "B" group are available for consumption - even though it is
>>>>> "behind"
>>>>> group "A"
>>>>> in the queue.
>>>>>
>>>>>
>>>>> ## Rationale
>>>>>
>>>>>
>>>>> The solution described above allows an application to designate a
>>>>> set of
>>>>> related messages, and provides policies for controlling the
>>>>> consumption of the
>>>>> message group.
>>>>>
>>>>>     * Goal: allow dynamic values for the group identifiers (no
>>>>>     preconfiguration of identifiers necessary)
>>>>>     * Goal: the number of message groups "in flight" should only be
>>>>>     limited by the available resources (no need to configure a hard
>>>>>     limit).
>>>>>     * Goal: manageability: visibility into the message groups
>>>>>     currently on a given queue
>>>>>     * Goal: manageability: purge, move, reroute messages at the
>>>>>     group
>>>>>     level.
>>>>>
>>>>> ## Implementation Notes
>>>>>
>>>>>     * Queues: support configuration of the group identifier header
>>>>>     key.
>>>>>     * Messages: provide access to group identifier.
>>>>>     * Queues: identify head message of next available message
>>>>>     group.
>>>>>     * Queues: block the trailing messages in a given message group
>>>>>     from being consumed.
>>>>>     * Consumers: track the message group of the currently acquired
>>>>>     message(s).
>>>>>     * Clustering: ensure state is replicated within the cluster as
>>>>>     needed.
>>>>>
>>>>> ## Consequences
>>>>>
>>>>>     * __Development:__ No changes to the development process.
>>>>>     * __Release:__ No changes to the release process.
>>>>>     * __Documentation:__ User documentation will be needed to
>>>>>     explain
>>>>>     the feature, and the steps to configure and manage the new
>>>>>     feature.
>>>>>     * __Configuration:__ Yes: per-queue group identifier header key
>>>>>     is
>>>>>     configurable. Queue state with regard to in-flight message
>>>>>     groups
>>>>>     needs to be visible. Additional methods to purge/move/reroute a
>>>>>     message group.
>>>>>     * __Compatibility:__ Unlikely - new feature that would need to
>>>>>     be
>>>>>     enabled manually. Applications wishing to use the feature would
>>>>>     need to implement a message grouping policy, and ensure the
>>>>>     processing of received message data is compliant with the
>>>>>     desired
>>>>>     policy.
>>>>>
>>>>> ## References
>>>>>
>>>>>     * None.
>>>>>
>>>>> ## Contributor-in-Charge
>>>>>
>>>>> Kenneth Giusti,<kg...@redhat.com>
>>>>>
>>>>> ## Contributors
>>>>>
>>>>>     * Ted Ross,<tr...@redhat.com>
>>>>>
>>>>> ## Version
>>>>>
>>>>> 0.2
>>>>>
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> Apache Qpid - AMQP Messaging Implementation
>>>> Project: http://qpid.apache.org
>>>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>>
>>> ---------------------------------------------------------------------
>>> Apache Qpid - AMQP Messaging Implementation
>>> Project: http://qpid.apache.org
>>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project: http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
Sorry, didn't read entire mail last time...

On 07/20/2011 01:39 PM, Ken Giusti wrote:
[snip]
>>>
>>> In this case, it probably would be sufficient to maintain a counter
>>> in the group's state. I'm assuming that we'll need a group lookup on
>>> message arrival to find that state (or create it on the first
>>> received message). Increment the counter when a message in that
>>> group is enqueued, decrement when the message is dequeued. If
>>> decremented to zero, delete the group. Performance impact of such an
>>> approach is TBD, of course - probably need to pool the state objects
>>> to limit memory trashing, etc....
>>>
>>
>> That doesn't sound right. The first N messages of a group could be
>> dequeued
>> before the N+1 message is enqueued, which would put your counter to 0
>> before the
>> entire group is processed. I think we need an explicit end of group
>> marker on
>> the last message in the group to allow the user to close a group in a
>> sceanrio
>> with lots of dynamic group ids. You could ignore this in cases where
>> there is a
>> small fixed set of groups that are used over a long time.
>>
>
>
> Ah, yes - sorry.   The lifetime of the group's state depends on the type of "Policy" that is being used (see below).  For the "Sequenced Consumers" policy - which is what I was thinking of in my last reply - the broker doesn't need to maintain state across all messages of the group.  With that policy, the state can be dropped once there are no more messages for that group present in the broker.
>

Again I don't get that. You're putting group boundaries at unpredictable 
arbitrary points depending on relative speed of producer/consumer. The producer 
may send the first N messages of a group containing N+M messages, then the 
consumer consumes the first N messages. Now the queue is empty so the next M 
messages are considered a new group, where N is impossibly to predict in 
advance. So you randomly cut up the groups.

> For the other proposed policy - Exclusive Consumer - then the group state would be associated with a particular Consumer instance, and remain present as long as the Consumer exists.  So in that case, yes - there is the potential that an unbounded number of group states could exist without some kind of reclaim strategy.   And end-of-group marker could be used, but that wouldn't prevent a DOS by a misbehaving app that creates endless groups w/o setting the end marker.  Perhaps a hard max-groups-per-Consumer limit, or a TTL for idle groups?

I'm not so worried about that, it's no worse than the fact that a client can 
create a huge number of of sessions or consumers etc. Solving that family of DOS 
attacks is a separate issue I think.

[snip]
>>>>> This QIP defines two message group policies. Additional policies
>>>>> may
>>>>> be
>>>>> defined in the future.
>>>>>
>>>>> Policy 1: Exclusive Consumer
>>>>>
>>>>> With this policy, the broker would guarantee that all messages in
>>>>> a
>>>>> group
>>>>> would be delivered to the same client.
>>>>>
>>>>> This policy would be configured on a per-queue basis.
>>
>> We could configure policy on a per-group basis with the value of the
>> "qpid.group" property. E.g. qpid.group=exclusive.
>> That's more flexible and puts control of the policy in the hands of
>> the
>> producer, rather than requiring an admin convention that associates a
>> queue name
>> with a policy.
>>
>
> I'm not sure I completely understand: are you proposing the producer sets both the group identifier and a policy type identifier on a per-message basis?

As above I think we need an explicit "end group" message, so it would be 
symmetric to have an explicit "start group" message also that specifies the 
policy. This could be done with multiple properties (but that makes configuring 
the property name more complicated) or with a configuration string e.g. 
group-id=mygroup{policy=xxx} or somesuch. I'm not sure which approach I prefer 
both are a little icky.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Ken Giusti <kg...@redhat.com>.
Responses inline:

-K

----- Original Message -----
> On 07/20/2011 11:57 AM, Ken Giusti wrote:
> >
> > Alan - thanks for the input, replies inline....
> >
> > -K
> >
> > ----- Original Message -----
> >> On 07/01/2011 07:04 AM, Ken Giusti wrote:
> >>> Folks,
> >>>
> >>> Here's a second draft of the proposal. I've tried to incorporate
> >>> the
> >>> feedback provided during the last week. And I have tried to limit
> >>> the proposed feature set a bit more.
> >>>
> >>> Opinions welcome, thanks -
> >>>
> >>> -K
> >>>
> >>>
> >>> # Message Groups
> >>>
> >>> ## Status
> >>>
> >>> Draft
> >>>
> >>> ## Summary
> >>>
> >>> This document describes a new feature that would allow an
> >>> application to
> >>> classify a set of related messages as belonging to a group. This
> >>> document also
> >>> describes two policies that the broker could apply when delivering
> >>> a
> >>> message
> >>> group to one or more consumers.
> >>>
> >>>
> >>> ## Problem
> >>>
> >>> It would be useful to give an application the ability to classify
> >>> a
> >>> set of
> >>> messages as belonging to a single unit of work. Furthermore, if
> >>> the
> >>> broker can
> >>> identify messages belonging to the same unit of work, it can
> >>> enforce
> >>> policies
> >>> that control how that unit of work can be consumed.
> >>>
> >>> For example, it may be desirable to guarantee that a particular
> >>> set
> >>> of
> >>> messages are consumed by the same client, even in the case where
> >>> there are
> >>> multiple clients consuming from the same queue.
> >>>
> >>> In a different scenario, it may be permissible for different
> >>> clients
> >>> to consume
> >>> messages from the same group, as long as it can be guaranteed that
> >>> the
> >>> individual messages are not processed in parallel. In other words,
> >>> the broker
> >>> would ensure that messages are processed by consumers in the same
> >>> order in
> >>> which they were enqueued.
> >>>
> >>> For example, assume we have a shopping application that manages
> >>> items in a
> >>> virtual shopping cart. A user may add an item to their shopping
> >>> cart, then
> >>> change their mind and remove it. If the application sends an "add"
> >>> message to
> >>> the broker, immediately followed by a "remove" message, they will
> >>> be
> >>> queued in
> >>> the proper order - "add", then "remove".
> >>>
> >>> However, if there are multiple consumers, it is possible that once
> >>> a
> >>> consumer
> >>> acquires the "add" message, a different consumer may acquire the
> >>> "remove"
> >>> message. This allows both messages to be processed in parallel,
> >>> which could
> >>> result in the "remove" operation being performed before the "add"
> >>> operation.
> >>>
> >>>
> >>> ## Solution
> >>>
> >>> This QIP proposes the following:
> >>>
> >>> 1) provide the ability for a message producer to designate a set
> >>> of
> >>> messages
> >>> as belonging to the same group.
> >>>
> >>> 2) allow the broker to identify messages that belong to the same
> >>> group.
> >>>
> >>> 3) define policies for the broker that control the delivery of
> >>> messages
> >>> belonging to the same group.
> >>>
> >>> For #1:
> >>>
> >>> The sending application would define a message header that would
> >>> contain the
> >>> message's group identifier. The group identifier stored in that
> >>> header field
> >>> would be a string value determined by the application. Messages
> >>> from
> >>> the same
> >>> group would have the same group identifier value.
> >>
> >> The broker will likely have to keep state associated with a group.
> >> How
> >> does the
> >> broker know when a group ends so it can reclaim those resources?
> >>
> >
> >
> > In this case, it probably would be sufficient to maintain a counter
> > in the group's state. I'm assuming that we'll need a group lookup on
> > message arrival to find that state (or create it on the first
> > received message). Increment the counter when a message in that
> > group is enqueued, decrement when the message is dequeued. If
> > decremented to zero, delete the group. Performance impact of such an
> > approach is TBD, of course - probably need to pool the state objects
> > to limit memory trashing, etc....
> >
> 
> That doesn't sound right. The first N messages of a group could be
> dequeued
> before the N+1 message is enqueued, which would put your counter to 0
> before the
> entire group is processed. I think we need an explicit end of group
> marker on
> the last message in the group to allow the user to close a group in a
> sceanrio
> with lots of dynamic group ids. You could ignore this in cases where
> there is a
> small fixed set of groups that are used over a long time.
> 


Ah, yes - sorry.   The lifetime of the group's state depends on the type of "Policy" that is being used (see below).  For the "Sequenced Consumers" policy - which is what I was thinking of in my last reply - the broker doesn't need to maintain state across all messages of the group.  With that policy, the state can be dropped once there are no more messages for that group present in the broker.  

For the other proposed policy - Exclusive Consumer - then the group state would be associated with a particular Consumer instance, and remain present as long as the Consumer exists.  So in that case, yes - there is the potential that an unbounded number of group states could exist without some kind of reclaim strategy.   And end-of-group marker could be used, but that wouldn't prevent a DOS by a misbehaving app that creates endless groups w/o setting the end marker.  Perhaps a hard max-groups-per-Consumer limit, or a TTL for idle groups?



> >
> >
> >>> For #2:
> >>>
> >>> The key for the header that contains the group identifier would be
> >>> provided to
> >>> the broker via configuration.
> >>
> >> A fixed name like 'qpid.group' seems less likely to confuse.
> >> Perhaps
> >> have id
> >> default to 'qpid.group' but be cofigurable to another name in the
> >> (unlikely?)
> >> event that it is required.
> >>
> >
> > That's a good idea. The current AMQP 1.0 draft defines a message
> > property header called "group-id", perhaps we should use
> > "qpid.group-id" as the default instead of "qpid.group"?
> 
> Should we be using the AMQP group-id? What's it for?
> 


It appears to be for the exact same purpose - identify the group a message belongs to.  Set by the application and preserved to destination.  But I don't see any specific use cases proposed in the spec.


> 
> >>>    From the broker's perspective, the number of different group id
> >>>   values would be
> >>> unlimited. And the value of group identifiers would not be
> >>> provided
> >>> ahead of
> >>> time by configuration: the broker must learn them at runtime.
> >>>
> >>> For #3:
> >>>
> >>> This QIP defines two message group policies. Additional policies
> >>> may
> >>> be
> >>> defined in the future.
> >>>
> >>> Policy 1: Exclusive Consumer
> >>>
> >>> With this policy, the broker would guarantee that all messages in
> >>> a
> >>> group
> >>> would be delivered to the same client.
> >>>
> >>> This policy would be configured on a per-queue basis.
> 
> We could configure policy on a per-group basis with the value of the
> "qpid.group" property. E.g. qpid.group=exclusive.
> That's more flexible and puts control of the policy in the hands of
> the
> producer, rather than requiring an admin convention that associates a
> queue name
> with a policy.
> 

I'm not sure I completely understand: are you proposing the producer sets both the group identifier and a policy type identifier on a per-message basis?


> >>>  When the first
> >>> message
> >>> of a new message group becomes available for delivery, the broker
> >>> will
> >>> associate that group with the next available consumer. The broker
> >>> would then
> >>> guarantee that all messages from that group are delivered to that
> >>> consumer
> >>> only.
> >>>
> >>> The broker will maintain the group/client association for the
> >>> lifetime of the
> >>> client. Should the client die or cancel its subscription, any
> >>> unacknowledged
> >>> messages in the group will be assigned to a different client
> >>> (preserving
> >>> message order). Group/client associations are not maintained
> >>> across
> >>> broker
> >>> restart. These associations must be replicated in a clustered
> >>> broker.
> >>>
> >>>
> >>> Policy #2: Sequenced Consumers
> >>>
> >>> With this policy, the broker would guarantee that the order in
> >>> which
> >>> messages
> >>> in a group are processed by consumers is the same order in which
> >>> the
> >>> messages
> >>> where enqueued. This guarantee would be upheld even if the
> >>> messages
> >>> of the
> >>> group are processed by different consumers. No two messages from
> >>> the
> >>> same
> >>> group would be processed in parallel by different consumers.
> >>>
> >>> Specifically, for any given group, the broker allows only the
> >>> first
> >>> N messages
> >>> in the group to be available for delivery to a particular
> >>> consumer.
> >>> The value
> >>> of N would be determined by the selected consumer's configured
> >>> prefetch
> >>> capacity. The broker blocks access to the remaining messages in
> >>> that
> >>> group by
> >>> any other consumer. Once the selected consumer has acknowledged
> >>> that
> >>> first set
> >>> of delivered messages, the broker allows the next messages in the
> >>> group to be
> >>> available for delivery. The next set of messages may be delivered
> >>> to
> >>> a
> >>> different consumer.
> >>>
> >>> This policy would be configured on a per-queue basis.
> >>> Configuration
> >>> would
> >>> include designating the key of the application header that
> >>> specifies
> >>> the group
> >>> id.
> >>>
> >>> Note will that, with this policy, the consuming application has
> >>> to:
> >>>
> >>> 1. ensure that it has completely processed the data in a received
> >>> message
> >>> before accepting that message, as described in Section 2.6.2.
> >>> Transfer of
> >>> Responsibility, of the AMQP-0.10 specification.
> >>>
> >>> 2. ensure that messages are not selectively acknowledged or
> >>> released
> >>> - order
> >>> must be preserved in both cases.
> >>
> >> What happens if the consuming application fails to respect these
> >> requirements?
> >>
> >
> > Hmmm... good question. My initial thoughts are that this would be
> > considered an application error, since preserving order is important
> > for this feature. Perhaps the broker should respond with an error
> > exception?
> >
> >
> >>
> >>> Note well that in the case of either of these proposed policies,
> >>> distinct
> >>> message groups would not block each other from delivery. For
> >>> example, assume a
> >>> queue contains messages from two different message groups - say
> >>> group "A" and
> >>> group "B" - and they are enqueued such that "A"'s messages are in
> >>> front of
> >>> "B". If the first message of group "A" is in the process of being
> >>> consumed by a
> >>> client, then the remaining "A" messages are blocked, but the
> >>> messages of the
> >>> "B" group are available for consumption - even though it is
> >>> "behind"
> >>> group "A"
> >>> in the queue.
> >>>
> >>>
> >>> ## Rationale
> >>>
> >>>
> >>> The solution described above allows an application to designate a
> >>> set of
> >>> related messages, and provides policies for controlling the
> >>> consumption of the
> >>> message group.
> >>>
> >>>    * Goal: allow dynamic values for the group identifiers (no
> >>>    preconfiguration of identifiers necessary)
> >>>    * Goal: the number of message groups "in flight" should only be
> >>>    limited by the available resources (no need to configure a hard
> >>>    limit).
> >>>    * Goal: manageability: visibility into the message groups
> >>>    currently on a given queue
> >>>    * Goal: manageability: purge, move, reroute messages at the
> >>>    group
> >>>    level.
> >>>
> >>> ## Implementation Notes
> >>>
> >>>    * Queues: support configuration of the group identifier header
> >>>    key.
> >>>    * Messages: provide access to group identifier.
> >>>    * Queues: identify head message of next available message
> >>>    group.
> >>>    * Queues: block the trailing messages in a given message group
> >>>    from being consumed.
> >>>    * Consumers: track the message group of the currently acquired
> >>>    message(s).
> >>>    * Clustering: ensure state is replicated within the cluster as
> >>>    needed.
> >>>
> >>> ## Consequences
> >>>
> >>>    * __Development:__ No changes to the development process.
> >>>    * __Release:__ No changes to the release process.
> >>>    * __Documentation:__ User documentation will be needed to
> >>>    explain
> >>>    the feature, and the steps to configure and manage the new
> >>>    feature.
> >>>    * __Configuration:__ Yes: per-queue group identifier header key
> >>>    is
> >>>    configurable. Queue state with regard to in-flight message
> >>>    groups
> >>>    needs to be visible. Additional methods to purge/move/reroute a
> >>>    message group.
> >>>    * __Compatibility:__ Unlikely - new feature that would need to
> >>>    be
> >>>    enabled manually. Applications wishing to use the feature would
> >>>    need to implement a message grouping policy, and ensure the
> >>>    processing of received message data is compliant with the
> >>>    desired
> >>>    policy.
> >>>
> >>> ## References
> >>>
> >>>    * None.
> >>>
> >>> ## Contributor-in-Charge
> >>>
> >>> Kenneth Giusti,<kg...@redhat.com>
> >>>
> >>> ## Contributors
> >>>
> >>>    * Ted Ross,<tr...@redhat.com>
> >>>
> >>> ## Version
> >>>
> >>> 0.2
> >>>
> >>>
> >>
> >> ---------------------------------------------------------------------
> >> Apache Qpid - AMQP Messaging Implementation
> >> Project: http://qpid.apache.org
> >> Use/Interact: mailto:dev-subscribe@qpid.apache.org
> >
> > ---------------------------------------------------------------------
> > Apache Qpid - AMQP Messaging Implementation
> > Project: http://qpid.apache.org
> > Use/Interact: mailto:dev-subscribe@qpid.apache.org
> >
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 07/20/2011 11:57 AM, Ken Giusti wrote:
>
> Alan - thanks for the input, replies inline....
>
> -K
>
> ----- Original Message -----
>> On 07/01/2011 07:04 AM, Ken Giusti wrote:
>>> Folks,
>>>
>>> Here's a second draft of the proposal. I've tried to incorporate the
>>> feedback provided during the last week. And I have tried to limit
>>> the proposed feature set a bit more.
>>>
>>> Opinions welcome, thanks -
>>>
>>> -K
>>>
>>>
>>> # Message Groups
>>>
>>> ## Status
>>>
>>> Draft
>>>
>>> ## Summary
>>>
>>> This document describes a new feature that would allow an
>>> application to
>>> classify a set of related messages as belonging to a group. This
>>> document also
>>> describes two policies that the broker could apply when delivering a
>>> message
>>> group to one or more consumers.
>>>
>>>
>>> ## Problem
>>>
>>> It would be useful to give an application the ability to classify a
>>> set of
>>> messages as belonging to a single unit of work. Furthermore, if the
>>> broker can
>>> identify messages belonging to the same unit of work, it can enforce
>>> policies
>>> that control how that unit of work can be consumed.
>>>
>>> For example, it may be desirable to guarantee that a particular set
>>> of
>>> messages are consumed by the same client, even in the case where
>>> there are
>>> multiple clients consuming from the same queue.
>>>
>>> In a different scenario, it may be permissible for different clients
>>> to consume
>>> messages from the same group, as long as it can be guaranteed that
>>> the
>>> individual messages are not processed in parallel. In other words,
>>> the broker
>>> would ensure that messages are processed by consumers in the same
>>> order in
>>> which they were enqueued.
>>>
>>> For example, assume we have a shopping application that manages
>>> items in a
>>> virtual shopping cart. A user may add an item to their shopping
>>> cart, then
>>> change their mind and remove it. If the application sends an "add"
>>> message to
>>> the broker, immediately followed by a "remove" message, they will be
>>> queued in
>>> the proper order - "add", then "remove".
>>>
>>> However, if there are multiple consumers, it is possible that once a
>>> consumer
>>> acquires the "add" message, a different consumer may acquire the
>>> "remove"
>>> message. This allows both messages to be processed in parallel,
>>> which could
>>> result in the "remove" operation being performed before the "add"
>>> operation.
>>>
>>>
>>> ## Solution
>>>
>>> This QIP proposes the following:
>>>
>>> 1) provide the ability for a message producer to designate a set of
>>> messages
>>> as belonging to the same group.
>>>
>>> 2) allow the broker to identify messages that belong to the same
>>> group.
>>>
>>> 3) define policies for the broker that control the delivery of
>>> messages
>>> belonging to the same group.
>>>
>>> For #1:
>>>
>>> The sending application would define a message header that would
>>> contain the
>>> message's group identifier. The group identifier stored in that
>>> header field
>>> would be a string value determined by the application. Messages from
>>> the same
>>> group would have the same group identifier value.
>>
>> The broker will likely have to keep state associated with a group. How
>> does the
>> broker know when a group ends so it can reclaim those resources?
>>
>
>
> In this case, it probably would be sufficient to maintain a counter in the group's state.  I'm assuming that we'll need a group lookup on message arrival to find that state (or create it on the first received message).   Increment the counter when a message in that group is enqueued, decrement when the message is dequeued.  If decremented to zero, delete the group.   Performance impact of such an approach is TBD, of course - probably need to pool the state objects to limit memory trashing, etc....
>

That doesn't sound right. The first N messages of a group could be dequeued 
before the N+1 message is enqueued, which would put your counter to 0 before the 
entire group is processed. I think we need an explicit end of group marker on 
the last message in the group to allow the user to close a group in a sceanrio 
with lots of dynamic group ids. You could ignore this in cases where there is a 
small fixed set of groups that are used over a long time.

>
>
>>> For #2:
>>>
>>> The key for the header that contains the group identifier would be
>>> provided to
>>> the broker via configuration.
>>
>> A fixed name like 'qpid.group' seems less likely to confuse. Perhaps
>> have id
>> default to 'qpid.group' but be cofigurable to another name in the
>> (unlikely?)
>> event that it is required.
>>
>
> That's a good idea.  The current AMQP 1.0 draft defines a message property header called "group-id", perhaps we should use "qpid.group-id" as the default instead of "qpid.group"?

Should we be using the AMQP group-id? What's it for?


>>>    From the broker's perspective, the number of different group id
>>>   values would be
>>> unlimited. And the value of group identifiers would not be provided
>>> ahead of
>>> time by configuration: the broker must learn them at runtime.
>>>
>>> For #3:
>>>
>>> This QIP defines two message group policies. Additional policies may
>>> be
>>> defined in the future.
>>>
>>> Policy 1: Exclusive Consumer
>>>
>>> With this policy, the broker would guarantee that all messages in a
>>> group
>>> would be delivered to the same client.
>>>
>>> This policy would be configured on a per-queue basis.

We could configure policy on a per-group basis with the value of the 
"qpid.group" property. E.g. qpid.group=exclusive.
That's more flexible and puts control of the policy in the hands of the 
producer, rather than requiring an admin convention that associates a queue name 
with a policy.

>>>  When the first
>>> message
>>> of a new message group becomes available for delivery, the broker
>>> will
>>> associate that group with the next available consumer. The broker
>>> would then
>>> guarantee that all messages from that group are delivered to that
>>> consumer
>>> only.
>>>
>>> The broker will maintain the group/client association for the
>>> lifetime of the
>>> client. Should the client die or cancel its subscription, any
>>> unacknowledged
>>> messages in the group will be assigned to a different client
>>> (preserving
>>> message order). Group/client associations are not maintained across
>>> broker
>>> restart. These associations must be replicated in a clustered
>>> broker.
>>>
>>>
>>> Policy #2: Sequenced Consumers
>>>
>>> With this policy, the broker would guarantee that the order in which
>>> messages
>>> in a group are processed by consumers is the same order in which the
>>> messages
>>> where enqueued. This guarantee would be upheld even if the messages
>>> of the
>>> group are processed by different consumers. No two messages from the
>>> same
>>> group would be processed in parallel by different consumers.
>>>
>>> Specifically, for any given group, the broker allows only the first
>>> N messages
>>> in the group to be available for delivery to a particular consumer.
>>> The value
>>> of N would be determined by the selected consumer's configured
>>> prefetch
>>> capacity. The broker blocks access to the remaining messages in that
>>> group by
>>> any other consumer. Once the selected consumer has acknowledged that
>>> first set
>>> of delivered messages, the broker allows the next messages in the
>>> group to be
>>> available for delivery. The next set of messages may be delivered to
>>> a
>>> different consumer.
>>>
>>> This policy would be configured on a per-queue basis. Configuration
>>> would
>>> include designating the key of the application header that specifies
>>> the group
>>> id.
>>>
>>> Note will that, with this policy, the consuming application has to:
>>>
>>> 1. ensure that it has completely processed the data in a received
>>> message
>>> before accepting that message, as described in Section 2.6.2.
>>> Transfer of
>>> Responsibility, of the AMQP-0.10 specification.
>>>
>>> 2. ensure that messages are not selectively acknowledged or released
>>> - order
>>> must be preserved in both cases.
>>
>> What happens if the consuming application fails to respect these
>> requirements?
>>
>
> Hmmm... good question.  My initial thoughts are that this would be considered an application error, since preserving order is important for this feature.  Perhaps the broker should respond with an error exception?
>
>
>>
>>> Note well that in the case of either of these proposed policies,
>>> distinct
>>> message groups would not block each other from delivery. For
>>> example, assume a
>>> queue contains messages from two different message groups - say
>>> group "A" and
>>> group "B" - and they are enqueued such that "A"'s messages are in
>>> front of
>>> "B". If the first message of group "A" is in the process of being
>>> consumed by a
>>> client, then the remaining "A" messages are blocked, but the
>>> messages of the
>>> "B" group are available for consumption - even though it is "behind"
>>> group "A"
>>> in the queue.
>>>
>>>
>>> ## Rationale
>>>
>>>
>>> The solution described above allows an application to designate a
>>> set of
>>> related messages, and provides policies for controlling the
>>> consumption of the
>>> message group.
>>>
>>>    * Goal: allow dynamic values for the group identifiers (no
>>>    preconfiguration of identifiers necessary)
>>>    * Goal: the number of message groups "in flight" should only be
>>>    limited by the available resources (no need to configure a hard
>>>    limit).
>>>    * Goal: manageability: visibility into the message groups
>>>    currently on a given queue
>>>    * Goal: manageability: purge, move, reroute messages at the group
>>>    level.
>>>
>>> ## Implementation Notes
>>>
>>>    * Queues: support configuration of the group identifier header
>>>    key.
>>>    * Messages: provide access to group identifier.
>>>    * Queues: identify head message of next available message group.
>>>    * Queues: block the trailing messages in a given message group
>>>    from being consumed.
>>>    * Consumers: track the message group of the currently acquired
>>>    message(s).
>>>    * Clustering: ensure state is replicated within the cluster as
>>>    needed.
>>>
>>> ## Consequences
>>>
>>>    * __Development:__ No changes to the development process.
>>>    * __Release:__ No changes to the release process.
>>>    * __Documentation:__ User documentation will be needed to explain
>>>    the feature, and the steps to configure and manage the new
>>>    feature.
>>>    * __Configuration:__ Yes: per-queue group identifier header key is
>>>    configurable. Queue state with regard to in-flight message groups
>>>    needs to be visible. Additional methods to purge/move/reroute a
>>>    message group.
>>>    * __Compatibility:__ Unlikely - new feature that would need to be
>>>    enabled manually. Applications wishing to use the feature would
>>>    need to implement a message grouping policy, and ensure the
>>>    processing of received message data is compliant with the desired
>>>    policy.
>>>
>>> ## References
>>>
>>>    * None.
>>>
>>> ## Contributor-in-Charge
>>>
>>> Kenneth Giusti,<kg...@redhat.com>
>>>
>>> ## Contributors
>>>
>>>    * Ted Ross,<tr...@redhat.com>
>>>
>>> ## Version
>>>
>>> 0.2
>>>
>>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project: http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 07/20/2011 04:57 PM, Ken Giusti wrote:
> ----- Original Message -----
>> On 07/01/2011 07:04 AM, Ken Giusti wrote:
>>> For #2:
>>>
>>> The key for the header that contains the group identifier would be
>>> provided to
>>> the broker via configuration.
>>
>> A fixed name like 'qpid.group' seems less likely to confuse. Perhaps
>> have id
>> default to 'qpid.group' but be cofigurable to another name in the
>> (unlikely?)
>> event that it is required.
>>
>
> That's a good idea.  The current AMQP 1.0 draft defines a message property header called "group-id", perhaps we should use "qpid.group-id" as the default instead of "qpid.group"?

The advantage of a configurable header is that the group can be keyed 
off some application meaningful property and the application need not be 
altered just to introduce a particular arbitrary property.

I actually think that having a default makes it more confusing. You will 
still have to turn grouping on explicitly, right? So why not have the 
mechanism for so doing be the identification of a header from which the 
message's group can be determined? I don't find that confusing.

>>> 2. ensure that messages are not selectively acknowledged or released
>>> - order
>>> must be preserved in both cases.
>>
>> What happens if the consuming application fails to respect these
>> requirements?
>>
>
> Hmmm... good question.  My initial thoughts are that this would be considered an application error, since preserving order is important for this feature.  Perhaps the broker should respond with an error exception?

I think that is an application error. The aim is to allow processing of 
the messages in order. If the application fails to sensibly communicate 
regarding that processing, it is itself responsible for the results.

(I could imagine in the future ways of preventing this or alerting 
applications about it, but I think for now that should be out of scope).

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Ken Giusti <kg...@redhat.com>.
Alan - thanks for the input, replies inline....

-K

----- Original Message -----
> On 07/01/2011 07:04 AM, Ken Giusti wrote:
> > Folks,
> >
> > Here's a second draft of the proposal. I've tried to incorporate the
> > feedback provided during the last week. And I have tried to limit
> > the proposed feature set a bit more.
> >
> > Opinions welcome, thanks -
> >
> > -K
> >
> >
> > # Message Groups
> >
> > ## Status
> >
> > Draft
> >
> > ## Summary
> >
> > This document describes a new feature that would allow an
> > application to
> > classify a set of related messages as belonging to a group. This
> > document also
> > describes two policies that the broker could apply when delivering a
> > message
> > group to one or more consumers.
> >
> >
> > ## Problem
> >
> > It would be useful to give an application the ability to classify a
> > set of
> > messages as belonging to a single unit of work. Furthermore, if the
> > broker can
> > identify messages belonging to the same unit of work, it can enforce
> > policies
> > that control how that unit of work can be consumed.
> >
> > For example, it may be desirable to guarantee that a particular set
> > of
> > messages are consumed by the same client, even in the case where
> > there are
> > multiple clients consuming from the same queue.
> >
> > In a different scenario, it may be permissible for different clients
> > to consume
> > messages from the same group, as long as it can be guaranteed that
> > the
> > individual messages are not processed in parallel. In other words,
> > the broker
> > would ensure that messages are processed by consumers in the same
> > order in
> > which they were enqueued.
> >
> > For example, assume we have a shopping application that manages
> > items in a
> > virtual shopping cart. A user may add an item to their shopping
> > cart, then
> > change their mind and remove it. If the application sends an "add"
> > message to
> > the broker, immediately followed by a "remove" message, they will be
> > queued in
> > the proper order - "add", then "remove".
> >
> > However, if there are multiple consumers, it is possible that once a
> > consumer
> > acquires the "add" message, a different consumer may acquire the
> > "remove"
> > message. This allows both messages to be processed in parallel,
> > which could
> > result in the "remove" operation being performed before the "add"
> > operation.
> >
> >
> > ## Solution
> >
> > This QIP proposes the following:
> >
> > 1) provide the ability for a message producer to designate a set of
> > messages
> > as belonging to the same group.
> >
> > 2) allow the broker to identify messages that belong to the same
> > group.
> >
> > 3) define policies for the broker that control the delivery of
> > messages
> > belonging to the same group.
> >
> > For #1:
> >
> > The sending application would define a message header that would
> > contain the
> > message's group identifier. The group identifier stored in that
> > header field
> > would be a string value determined by the application. Messages from
> > the same
> > group would have the same group identifier value.
> 
> The broker will likely have to keep state associated with a group. How
> does the
> broker know when a group ends so it can reclaim those resources?
> 


In this case, it probably would be sufficient to maintain a counter in the group's state.  I'm assuming that we'll need a group lookup on message arrival to find that state (or create it on the first received message).   Increment the counter when a message in that group is enqueued, decrement when the message is dequeued.  If decremented to zero, delete the group.   Performance impact of such an approach is TBD, of course - probably need to pool the state objects to limit memory trashing, etc....



> > For #2:
> >
> > The key for the header that contains the group identifier would be
> > provided to
> > the broker via configuration.
> 
> A fixed name like 'qpid.group' seems less likely to confuse. Perhaps
> have id
> default to 'qpid.group' but be cofigurable to another name in the
> (unlikely?)
> event that it is required.
> 

That's a good idea.  The current AMQP 1.0 draft defines a message property header called "group-id", perhaps we should use "qpid.group-id" as the default instead of "qpid.group"?


> >  From the broker's perspective, the number of different group id
> >  values would be
> > unlimited. And the value of group identifiers would not be provided
> > ahead of
> > time by configuration: the broker must learn them at runtime.
> >
> > For #3:
> >
> > This QIP defines two message group policies. Additional policies may
> > be
> > defined in the future.
> >
> > Policy 1: Exclusive Consumer
> >
> > With this policy, the broker would guarantee that all messages in a
> > group
> > would be delivered to the same client.
> >
> > This policy would be configured on a per-queue basis. When the first
> > message
> > of a new message group becomes available for delivery, the broker
> > will
> > associate that group with the next available consumer. The broker
> > would then
> > guarantee that all messages from that group are delivered to that
> > consumer
> > only.
> >
> > The broker will maintain the group/client association for the
> > lifetime of the
> > client. Should the client die or cancel its subscription, any
> > unacknowledged
> > messages in the group will be assigned to a different client
> > (preserving
> > message order). Group/client associations are not maintained across
> > broker
> > restart. These associations must be replicated in a clustered
> > broker.
> >
> >
> > Policy #2: Sequenced Consumers
> >
> > With this policy, the broker would guarantee that the order in which
> > messages
> > in a group are processed by consumers is the same order in which the
> > messages
> > where enqueued. This guarantee would be upheld even if the messages
> > of the
> > group are processed by different consumers. No two messages from the
> > same
> > group would be processed in parallel by different consumers.
> >
> > Specifically, for any given group, the broker allows only the first
> > N messages
> > in the group to be available for delivery to a particular consumer.
> > The value
> > of N would be determined by the selected consumer's configured
> > prefetch
> > capacity. The broker blocks access to the remaining messages in that
> > group by
> > any other consumer. Once the selected consumer has acknowledged that
> > first set
> > of delivered messages, the broker allows the next messages in the
> > group to be
> > available for delivery. The next set of messages may be delivered to
> > a
> > different consumer.
> >
> > This policy would be configured on a per-queue basis. Configuration
> > would
> > include designating the key of the application header that specifies
> > the group
> > id.
> >
> > Note will that, with this policy, the consuming application has to:
> >
> > 1. ensure that it has completely processed the data in a received
> > message
> > before accepting that message, as described in Section 2.6.2.
> > Transfer of
> > Responsibility, of the AMQP-0.10 specification.
> >
> > 2. ensure that messages are not selectively acknowledged or released
> > - order
> > must be preserved in both cases.
> 
> What happens if the consuming application fails to respect these
> requirements?
> 

Hmmm... good question.  My initial thoughts are that this would be considered an application error, since preserving order is important for this feature.  Perhaps the broker should respond with an error exception?


> 
> > Note well that in the case of either of these proposed policies,
> > distinct
> > message groups would not block each other from delivery. For
> > example, assume a
> > queue contains messages from two different message groups - say
> > group "A" and
> > group "B" - and they are enqueued such that "A"'s messages are in
> > front of
> > "B". If the first message of group "A" is in the process of being
> > consumed by a
> > client, then the remaining "A" messages are blocked, but the
> > messages of the
> > "B" group are available for consumption - even though it is "behind"
> > group "A"
> > in the queue.
> >
> >
> > ## Rationale
> >
> >
> > The solution described above allows an application to designate a
> > set of
> > related messages, and provides policies for controlling the
> > consumption of the
> > message group.
> >
> >   * Goal: allow dynamic values for the group identifiers (no
> >   preconfiguration of identifiers necessary)
> >   * Goal: the number of message groups "in flight" should only be
> >   limited by the available resources (no need to configure a hard
> >   limit).
> >   * Goal: manageability: visibility into the message groups
> >   currently on a given queue
> >   * Goal: manageability: purge, move, reroute messages at the group
> >   level.
> >
> > ## Implementation Notes
> >
> >   * Queues: support configuration of the group identifier header
> >   key.
> >   * Messages: provide access to group identifier.
> >   * Queues: identify head message of next available message group.
> >   * Queues: block the trailing messages in a given message group
> >   from being consumed.
> >   * Consumers: track the message group of the currently acquired
> >   message(s).
> >   * Clustering: ensure state is replicated within the cluster as
> >   needed.
> >
> > ## Consequences
> >
> >   * __Development:__ No changes to the development process.
> >   * __Release:__ No changes to the release process.
> >   * __Documentation:__ User documentation will be needed to explain
> >   the feature, and the steps to configure and manage the new
> >   feature.
> >   * __Configuration:__ Yes: per-queue group identifier header key is
> >   configurable. Queue state with regard to in-flight message groups
> >   needs to be visible. Additional methods to purge/move/reroute a
> >   message group.
> >   * __Compatibility:__ Unlikely - new feature that would need to be
> >   enabled manually. Applications wishing to use the feature would
> >   need to implement a message grouping policy, and ensure the
> >   processing of received message data is compliant with the desired
> >   policy.
> >
> > ## References
> >
> >   * None.
> >
> > ## Contributor-in-Charge
> >
> > Kenneth Giusti,<kg...@redhat.com>
> >
> > ## Contributors
> >
> >   * Ted Ross,<tr...@redhat.com>
> >
> > ## Version
> >
> > 0.2
> >
> >
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 07/01/2011 07:04 AM, Ken Giusti wrote:
> Folks,
>
> Here's a second draft of the proposal.  I've tried to incorporate the feedback provided during the last week.  And I have tried to limit the proposed feature set a bit more.
>
> Opinions welcome, thanks -
>
> -K
>
>
> # Message Groups
>
> ## Status
>
> Draft
>
> ## Summary
>
> This document describes a new feature that would allow an application to
> classify a set of related messages as belonging to a group. This document also
> describes two policies that the broker could apply when delivering a message
> group to one or more consumers.
>
>
> ## Problem
>
> It would be useful to give an application the ability to classify a set of
> messages as belonging to a single unit of work.  Furthermore, if the broker can
> identify messages belonging to the same unit of work, it can enforce policies
> that control how that unit of work can be consumed.
>
> For example, it may be desirable to guarantee that a particular set of
> messages are consumed by the same client, even in the case where there are
> multiple clients consuming from the same queue.
>
> In a different scenario, it may be permissible for different clients to consume
> messages from the same group, as long as it can be guaranteed that the
> individual messages are not processed in parallel.  In other words, the broker
> would ensure that messages are processed by consumers in the same order in
> which they were enqueued.
>
> For example, assume we have a shopping application that manages items in a
> virtual shopping cart.  A user may add an item to their shopping cart, then
> change their mind and remove it.  If the application sends an "add" message to
> the broker, immediately followed by a "remove" message, they will be queued in
> the proper order - "add", then "remove".
>
> However, if there are multiple consumers, it is possible that once a consumer
> acquires the "add" message, a different consumer may acquire the "remove"
> message.  This allows both messages to be processed in parallel, which could
> result in the "remove" operation being performed before the "add" operation.
>
>
> ## Solution
>
> This QIP proposes the following:
>
> 1) provide the ability for a message producer to designate a set of messages
> as belonging to the same group.
>
> 2) allow the broker to identify messages that belong to the same group.
>
> 3) define policies for the broker that control the delivery of messages
> belonging to the same group.
>
> For #1:
>
> The sending application would define a message header that would contain the
> message's group identifier.  The group identifier stored in that header field
> would be a string value determined by the application.  Messages from the same
> group would have the same group identifier value.

The broker will likely have to keep state associated with a group. How does the 
broker know when a group ends so it can reclaim those resources?

> For #2:
>
> The key for the header that contains the group identifier would be provided to
> the broker via configuration.

A fixed name like 'qpid.group' seems less likely to confuse. Perhaps have id 
default to 'qpid.group' but be cofigurable to another name in the (unlikely?) 
event that it is required.

>  From the broker's perspective, the number of different group id values would be
> unlimited.  And the value of group identifiers would not be provided ahead of
> time by configuration: the broker must learn them at runtime.
>
> For #3:
>
> This QIP defines two message group policies.  Additional policies may be
> defined in the future.
>
> Policy 1: Exclusive Consumer
>
> With this policy, the broker would guarantee that all messages in a group
> would be delivered to the same client.
>
> This policy would be configured on a per-queue basis.  When the first message
> of a new message group becomes available for delivery, the broker will
> associate that group with the next available consumer.  The broker would then
> guarantee that all messages from that group are delivered to that consumer
> only.
>
> The broker will maintain the group/client association for the lifetime of the
> client.  Should the client die or cancel its subscription, any unacknowledged
> messages in the group will be assigned to a different client (preserving
> message order).  Group/client associations are not maintained across broker
> restart.  These associations must be replicated in a clustered broker.
>
>
> Policy #2: Sequenced Consumers
>
> With this policy, the broker would guarantee that the order in which messages
> in a group are processed by consumers is the same order in which the messages
> where enqueued.  This guarantee would be upheld even if the messages of the
> group are processed by different consumers.  No two messages from the same
> group would be processed in parallel by different consumers.
>
> Specifically, for any given group, the broker allows only the first N messages
> in the group to be available for delivery to a particular consumer.  The value
> of N would be determined by the selected consumer's configured prefetch
> capacity.  The broker blocks access to the remaining messages in that group by
> any other consumer.  Once the selected consumer has acknowledged that first set
> of delivered messages, the broker allows the next messages in the group to be
> available for delivery.  The next set of messages may be delivered to a
> different consumer.
>
> This policy would be configured on a per-queue basis.  Configuration would
> include designating the key of the application header that specifies the group
> id.
>
> Note will that, with this policy, the consuming application has to:
>
> 1. ensure that it has completely processed the data in a received message
> before accepting that message, as described in Section 2.6.2. Transfer of
> Responsibility, of the AMQP-0.10 specification.
>
> 2. ensure that messages are not selectively acknowledged or released - order
> must be preserved in both cases.

What happens if the consuming application fails to respect these requirements?


> Note well that in the case of either of these proposed policies, distinct
> message groups would not block each other from delivery.  For example, assume a
> queue contains messages from two different message groups - say group "A" and
> group "B" - and they are enqueued such that "A"'s messages are in front of
> "B". If the first message of group "A" is in the process of being consumed by a
> client, then the remaining "A" messages are blocked, but the messages of the
> "B" group are available for consumption - even though it is "behind" group "A"
> in the queue.
>
>
> ## Rationale
>
>
> The solution described above allows an application to designate a set of
> related messages, and provides policies for controlling the consumption of the
> message group.
>
>   * Goal: allow dynamic values for the group identifiers (no preconfiguration of identifiers necessary)
>   * Goal: the number of message groups "in flight" should only be limited by the available resources (no need to configure a hard limit).
>   * Goal: manageability: visibility into the message groups currently on a given queue
>   * Goal: manageability: purge, move, reroute messages at the group level.
>
> ## Implementation Notes
>
>   * Queues: support configuration of the group identifier header key.
>   * Messages: provide access to group identifier.
>   * Queues: identify head message of next available message group.
>   * Queues: block the trailing messages in a given message group from being consumed.
>   * Consumers: track the message group of the currently acquired message(s).
>   * Clustering: ensure state is replicated within the cluster as needed.
>
> ## Consequences
>
>   * __Development:__ No changes to the development process.
>   * __Release:__ No changes to the release process.
>   * __Documentation:__ User documentation will be needed to explain the feature, and the steps to configure and manage the new feature.
>   * __Configuration:__ Yes: per-queue group identifier header key is configurable.  Queue state with regard to in-flight message groups needs to be visible.  Additional methods to purge/move/reroute a message group.
>   * __Compatibility:__ Unlikely - new feature that would need to be enabled manually.  Applications wishing to use the feature would need to implement a message grouping policy, and ensure the processing of received message data is compliant with the desired policy.
>
> ## References
>
>   * None.
>
> ## Contributor-in-Charge
>
> Kenneth Giusti,<kg...@redhat.com>
>
> ## Contributors
>
>   * Ted Ross,<tr...@redhat.com>
>
> ## Version
>
> 0.2
>
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Ken Giusti <kg...@redhat.com>.
Folks,

Here's a second draft of the proposal.  I've tried to incorporate the feedback provided during the last week.  And I have tried to limit the proposed feature set a bit more.

Opinions welcome, thanks -

-K


# Message Groups

## Status

Draft

## Summary

This document describes a new feature that would allow an application to
classify a set of related messages as belonging to a group. This document also
describes two policies that the broker could apply when delivering a message
group to one or more consumers.


## Problem

It would be useful to give an application the ability to classify a set of
messages as belonging to a single unit of work.  Furthermore, if the broker can
identify messages belonging to the same unit of work, it can enforce policies
that control how that unit of work can be consumed.

For example, it may be desirable to guarantee that a particular set of
messages are consumed by the same client, even in the case where there are
multiple clients consuming from the same queue.

In a different scenario, it may be permissible for different clients to consume
messages from the same group, as long as it can be guaranteed that the
individual messages are not processed in parallel.  In other words, the broker
would ensure that messages are processed by consumers in the same order in
which they were enqueued.

For example, assume we have a shopping application that manages items in a
virtual shopping cart.  A user may add an item to their shopping cart, then
change their mind and remove it.  If the application sends an "add" message to
the broker, immediately followed by a "remove" message, they will be queued in
the proper order - "add", then "remove".

However, if there are multiple consumers, it is possible that once a consumer
acquires the "add" message, a different consumer may acquire the "remove"
message.  This allows both messages to be processed in parallel, which could
result in the "remove" operation being performed before the "add" operation.


## Solution

This QIP proposes the following:

1) provide the ability for a message producer to designate a set of messages
as belonging to the same group.

2) allow the broker to identify messages that belong to the same group.

3) define policies for the broker that control the delivery of messages
belonging to the same group.

For #1:

The sending application would define a message header that would contain the
message's group identifier.  The group identifier stored in that header field
would be a string value determined by the application.  Messages from the same
group would have the same group identifier value.

For #2:

The key for the header that contains the group identifier would be provided to
the broker via configuration.

>From the broker's perspective, the number of different group id values would be
unlimited.  And the value of group identifiers would not be provided ahead of
time by configuration: the broker must learn them at runtime.

For #3:

This QIP defines two message group policies.  Additional policies may be
defined in the future.

Policy 1: Exclusive Consumer

With this policy, the broker would guarantee that all messages in a group
would be delivered to the same client.

This policy would be configured on a per-queue basis.  When the first message
of a new message group becomes available for delivery, the broker will
associate that group with the next available consumer.  The broker would then
guarantee that all messages from that group are delivered to that consumer
only.

The broker will maintain the group/client association for the lifetime of the
client.  Should the client die or cancel its subscription, any unacknowledged
messages in the group will be assigned to a different client (preserving
message order).  Group/client associations are not maintained across broker
restart.  These associations must be replicated in a clustered broker.


Policy #2: Sequenced Consumers

With this policy, the broker would guarantee that the order in which messages
in a group are processed by consumers is the same order in which the messages
where enqueued.  This guarantee would be upheld even if the messages of the
group are processed by different consumers.  No two messages from the same
group would be processed in parallel by different consumers.

Specifically, for any given group, the broker allows only the first N messages
in the group to be available for delivery to a particular consumer.  The value
of N would be determined by the selected consumer's configured prefetch
capacity.  The broker blocks access to the remaining messages in that group by
any other consumer.  Once the selected consumer has acknowledged that first set
of delivered messages, the broker allows the next messages in the group to be
available for delivery.  The next set of messages may be delivered to a
different consumer.

This policy would be configured on a per-queue basis.  Configuration would
include designating the key of the application header that specifies the group
id.

Note will that, with this policy, the consuming application has to:

1. ensure that it has completely processed the data in a received message
before accepting that message, as described in Section 2.6.2. Transfer of
Responsibility, of the AMQP-0.10 specification.

2. ensure that messages are not selectively acknowledged or released - order
must be preserved in both cases.

Note well that in the case of either of these proposed policies, distinct
message groups would not block each other from delivery.  For example, assume a
queue contains messages from two different message groups - say group "A" and
group "B" - and they are enqueued such that "A"'s messages are in front of
"B". If the first message of group "A" is in the process of being consumed by a
client, then the remaining "A" messages are blocked, but the messages of the
"B" group are available for consumption - even though it is "behind" group "A"
in the queue.


## Rationale


The solution described above allows an application to designate a set of
related messages, and provides policies for controlling the consumption of the
message group.

 * Goal: allow dynamic values for the group identifiers (no preconfiguration of identifiers necessary)
 * Goal: the number of message groups "in flight" should only be limited by the available resources (no need to configure a hard limit).
 * Goal: manageability: visibility into the message groups currently on a given queue
 * Goal: manageability: purge, move, reroute messages at the group level.

## Implementation Notes

 * Queues: support configuration of the group identifier header key.
 * Messages: provide access to group identifier.
 * Queues: identify head message of next available message group.
 * Queues: block the trailing messages in a given message group from being consumed.
 * Consumers: track the message group of the currently acquired message(s).
 * Clustering: ensure state is replicated within the cluster as needed.

## Consequences

 * __Development:__ No changes to the development process.
 * __Release:__ No changes to the release process.
 * __Documentation:__ User documentation will be needed to explain the feature, and the steps to configure and manage the new feature.
 * __Configuration:__ Yes: per-queue group identifier header key is configurable.  Queue state with regard to in-flight message groups needs to be visible.  Additional methods to purge/move/reroute a message group.
 * __Compatibility:__ Unlikely - new feature that would need to be enabled manually.  Applications wishing to use the feature would need to implement a message grouping policy, and ensure the processing of received message data is compliant with the desired policy.

## References

 * None.

## Contributor-in-Charge

Kenneth Giusti, <kg...@redhat.com>

## Contributors

 * Ted Ross, <tr...@redhat.com>

## Version

0.2







----- Original Message -----
> On 06/29/2011 11:16 AM, Gordon Sim wrote:
> > On 06/29/2011 11:08 AM, Alan Conway wrote:
> >> Gordon's mode 1 (stick to a consumer only as long as it has unacked
> >> messages) doesn't really do this: the only time it allows load to
> >> shift
> >> is when the queue is idle long enough for the consumer to ack all
> >> its
> >> pre-fetched messages. But if the queue is idle then we're not under
> >> heavy load which is where load balancing is most important.
> >
> > That's not necessarily true. The queue could have plenty of messages
> > on it and
> > not be idle, but not have messages in a particular group.
> 
> Good point!
> 
> > Where I think this mode may make load balancing more adaptive is
> > where there is
> > variability in the groups and the processing times of messages.
> >
> > For example, consumer 1 gets assigned group 1 and processes 1
> > available message
> > and then accepts it, there are no more messages at present for that
> > group. It
> > then gets messages sent for groups 3, 5 and 6 say. If those groups
> > now get a lot
> > of messages, consumer 1 may become quite busy. If another set of
> > messages for
> > group 1 are published, why not allow them to be processed by a
> > different
> > consumer. Assuming of course that the requirement is simply in-order
> > processing.
> >
> > I'm certainly not claiming this is perfect. However it seems to me
> > that it is
> > not necessary in all cases to maintain the stickiness for the
> > lifetime of the
> > consumer.
> 
> Indeed, if load balancing over long-lived groups is the primary
> concern (e.g.
> stock symbols as groups) then it is important that groups do not stick
> to
> consumers for their lifetime.
> 
> >> This makes me think there is a case for a 3rd mode of stickiness:
> >> non-stick mode. In this mode we randomly chose the next consumer
> >> when
> >> the current consumer acks. Clearly this will lower the possible
> >> msg/sec
> >> throughput since you are effectively doing synchronous messaging.
> >> However it may still be relevant where the msg/sec throughput is
> >> low
> >> anyway but the need for fair load balancing is high - e.g. very
> >> large
> >> messages or messages that require a long time to process. In that
> >> case
> >> the extra time added by synchronous messages may be negligible, but
> >> the
> >> automatic distribution of load is very valuable.
> >
> > Why not just set the prefetch on the consumers to 0 or 1, and have
> > the consumer
> > accept each message as it is processed. That would seem to achieve
> > the same
> > thing and is exactly what you would do to improve the fairness of
> > distribution
> > without message groups (again at the expense of throughput).
> 
> Excellent point. My non-stick mode is just your mode 2 + prefetch = 0.
> Very neat.
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 06/29/2011 11:16 AM, Gordon Sim wrote:
> On 06/29/2011 11:08 AM, Alan Conway wrote:
>> Gordon's mode 1 (stick to a consumer only as long as it has unacked
>> messages) doesn't really do this: the only time it allows load to shift
>> is when the queue is idle long enough for the consumer to ack all its
>> pre-fetched messages. But if the queue is idle then we're not under
>> heavy load which is where load balancing is most important.
>
> That's not necessarily true. The queue could have plenty of messages on it and
> not be idle, but not have messages in a particular group.

Good point!

> Where I think this mode may make load balancing more adaptive is where there is
> variability in the groups and the processing times of messages.
>
> For example, consumer 1 gets assigned group 1 and processes 1 available message
> and then accepts it, there are no more messages at present for that group. It
> then gets messages sent for groups 3, 5 and 6 say. If those groups now get a lot
> of messages, consumer 1 may become quite busy. If another set of messages for
> group 1 are published, why not allow them to be processed by a different
> consumer. Assuming of course that the requirement is simply in-order processing.
>
> I'm certainly not claiming this is perfect. However it seems to me that it is
> not necessary in all cases to maintain the stickiness for the lifetime of the
> consumer.

Indeed, if load balancing over long-lived groups is the primary concern (e.g. 
stock symbols as groups) then it is important that groups do not stick to 
consumers for their lifetime.

>> This makes me think there is a case for a 3rd mode of stickiness:
>> non-stick mode. In this mode we randomly chose the next consumer when
>> the current consumer acks. Clearly this will lower the possible msg/sec
>> throughput since you are effectively doing synchronous messaging.
>> However it may still be relevant where the msg/sec throughput is low
>> anyway but the need for fair load balancing is high - e.g. very large
>> messages or messages that require a long time to process. In that case
>> the extra time added by synchronous messages may be negligible, but the
>> automatic distribution of load is very valuable.
>
> Why not just set the prefetch on the consumers to 0 or 1, and have the consumer
> accept each message as it is processed. That would seem to achieve the same
> thing and is exactly what you would do to improve the fairness of distribution
> without message groups (again at the expense of throughput).

Excellent point. My non-stick mode is just your mode 2 + prefetch = 0. Very neat.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/29/2011 11:08 AM, Alan Conway wrote:
> Gordon's mode 1 (stick to a consumer only as long as it has unacked
> messages) doesn't really do this: the only time it allows load to shift
> is when the queue is idle long enough for the consumer to ack all its
> pre-fetched messages. But if the queue is idle then we're not under
> heavy load which is where load balancing is most important.

That's not necessarily true. The queue could have plenty of messages on 
it and not be idle, but not have messages in a particular group.

Where I think this mode may make load balancing more adaptive is where 
there is variability in the groups and the processing times of messages.

For example, consumer 1 gets assigned group 1 and processes 1 available 
message and then accepts it, there are no more messages at present for 
that group. It then gets messages sent for groups 3, 5 and 6 say. If 
those groups now get a lot of messages, consumer 1 may become quite 
busy. If another set of messages for group 1 are published, why not 
allow them to be processed by a different consumer. Assuming of course 
that the requirement is simply in-order processing.

I'm certainly not claiming this is perfect. However it seems to me that 
it is not necessary in all cases to maintain the stickiness for the 
lifetime of the consumer.

> This makes me think there is a case for a 3rd mode of stickiness:
> non-stick mode. In this mode we randomly chose the next consumer when
> the current consumer acks. Clearly this will lower the possible msg/sec
> throughput since you are effectively doing synchronous messaging.
> However it may still be relevant where the msg/sec throughput is low
> anyway but the need for fair load balancing is high - e.g. very large
> messages or messages that require a long time to process. In that case
> the extra time added by synchronous messages may be negligible, but the
> automatic distribution of load is very valuable.

Why not just set the prefetch on the consumers to 0 or 1, and have the 
consumer accept each message as it is processed. That would seem to 
achieve the same thing and is exactly what you would do to improve the 
fairness of distribution without message groups (again at the expense of 
throughput).


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 06/29/2011 10:37 AM, Marnie McCormack wrote:
> Some useful reading links, for the concept in JMS terms/other
> implementations (which users might expect us to support/be close ot
> semantically). I'm not saying any of these approaches are exactly correct,
> but that the logical behaviour they support is similar.
>
> http://activemq.apache.org/message-groups.html
> http://www.ibm.com/developerworks/websphere/library/techarticles/0602_currie/0602_currie.html
> http://forum.springsource.org/showthread.php?48798-Spring-2.5.1-JMS-1.1-and-distributed-jobs-implementation
>

The activemq document gives a good clear description. They are implementing 
Gordon's mode 2: stick to a consumer for as long as it lives.

They also raise an interesting point:

"If you have existing messages in the broker and add consumers at a later stage, 
it is a good idea to delay message dispatch start until all consumers are 
present (or at least to give enough time for them to subscribe). If you don't do 
that the first consumer will probably acquire all message groups and all 
messages will be dispatched to it. You can achieve this by using 
consumersBeforeDispatchStarts and timeBeforeDispatchStarts destination policies. "

In other words you would like to be able to shift load from a busy consumer to a 
less busy one, or have a random distribution of load that will even out on average.

Gordon's mode 1 (stick to a consumer only as long as it has unacked messages) 
doesn't really do this: the only time it allows load to shift is when the queue 
is idle long enough for the consumer to ack all its pre-fetched messages. But if 
the queue is idle then we're not under heavy load which is where load balancing 
is most important.

This makes me think there is a case for a 3rd mode of stickiness: non-stick 
mode. In this mode we randomly chose the next consumer when the current consumer 
acks. Clearly this will lower the possible msg/sec throughput since you are 
effectively doing synchronous messaging. However it may still be relevant where 
the msg/sec throughput is low anyway but the need for fair load balancing is 
high - e.g. very large messages or messages that require a long time to process. 
In that case the extra time added by synchronous messages may be negligible, but 
the automatic distribution of load is very valuable.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Marnie McCormack <ma...@googlemail.com>.
Some useful reading links, for the concept in JMS terms/other
implementations (which users might expect us to support/be close ot
semantically). I'm not saying any of these approaches are exactly correct,
but that the logical behaviour they support is similar.

http://activemq.apache.org/message-groups.html
http://www.ibm.com/developerworks/websphere/library/techarticles/0602_currie/0602_currie.html
http://forum.springsource.org/showthread.php?48798-Spring-2.5.1-JMS-1.1-and-distributed-jobs-implementation

Marnie
On Wed, Jun 29, 2011 at 10:33 AM, Marnie McCormack <
marnie.mccormack@googlemail.com> wrote:

> Message groups are typically used for items such as steps in a trade or a
> workflow process, so that all parts of a group are processed in a single
> transaction or not until all data required to process the action has
> arrived. Queues are different as the data may be entirely unrelated.
> Marnie
>   On Tue, Jun 28, 2011 at 4:10 PM, mick <mg...@redhat.com> wrote:
>
>> On Tue, 2011-06-28 at 15:51 +0100, Gordon Sim wrote:
>>
>> >
>> > E.g. imagine the group relates to some real world object being
>> > modelled
>> > and each message contains describes an update
>>
>>
>> To me that situation seems like it should be modeled as a queue.
>>
>> I think it would be very worthwhile in this discussion to figure out at
>> what point we feel that a customer ought to opt for a queue rather than
>> a message group.
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project:      http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>
>>
>

Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Marnie McCormack <ma...@googlemail.com>.
Message groups are typically used for items such as steps in a trade or a
workflow process, so that all parts of a group are processed in a single
transaction or not until all data required to process the action has
arrived. Queues are different as the data may be entirely unrelated.
Marnie
On Tue, Jun 28, 2011 at 4:10 PM, mick <mg...@redhat.com> wrote:

> On Tue, 2011-06-28 at 15:51 +0100, Gordon Sim wrote:
>
> >
> > E.g. imagine the group relates to some real world object being
> > modelled
> > and each message contains describes an update
>
>
> To me that situation seems like it should be modeled as a queue.
>
> I think it would be very worthwhile in this discussion to figure out at
> what point we feel that a customer ought to opt for a queue rather than
> a message group.
>
>
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
>

Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/28/2011 04:23 PM, Robert Godfrey wrote:
> To a certain extent it seem to me that whether it's modelled by multiple
> queues or a single queue with groups could be seen as an implementation
> detail within the broker... The user knows what they want, and we just need
> to provide them with a language for getting that functionality.  Certainly
> from the use cases which have been presented to me for these sort of
> situations, the number of groups is large, there identities are not known up
> front, and their lifetimes relatively short... thus the user wants a single
> stable address from which they can receive their messages...

Right, and that's what I meant here by not wanting to have to configure 
distinct queues. Your phrasing is better however. It is about presenting 
a single stable address for consumers.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Robert Godfrey <ro...@gmail.com>.
On 28 June 2011 17:13, Gordon Sim <gs...@redhat.com> wrote:

> On 06/28/2011 04:10 PM, mick wrote:
>
>> On Tue, 2011-06-28 at 15:51 +0100, Gordon Sim wrote:
>>
>>
>>> E.g. imagine the group relates to some real world object being
>>> modelled
>>> and each message contains describes an update
>>>
>>
>>
>> To me that situation seems like it should be modeled as a queue.
>>
>
> Fair point, however ...
>
>
>  I think it would be very worthwhile in this discussion to figure out at
>> what point we feel that a customer ought to opt for a queue rather than
>> a message group.
>>
>
> ... I think one of the criteria is how many groups there are and how
> dynamic the set of groups is.
>
> In this example it might be that there are lots and lots of objects, but
> only a few processing 'engines'. Further, the exact set of objects might be
> dynamic. I don't want to have to tie the processing engines to specific
> queues for each object, I want them to adapt to the changing system.



To a certain extent it seem to me that whether it's modelled by multiple
queues or a single queue with groups could be seen as an implementation
detail within the broker... The user knows what they want, and we just need
to provide them with a language for getting that functionality.  Certainly
from the use cases which have been presented to me for these sort of
situations, the number of groups is large, there identities are not known up
front, and their lifetimes relatively short... thus the user wants a single
stable address from which they can receive their messages... but once they
start getting messages for a particular group, they want to receive all
messages for that group.  A twist is whether they want
to interleave messages from different groups on the same consumer...

-- Rob

Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by mick <mg...@redhat.com>.
On Tue, 2011-06-28 at 16:13 +0100, Gordon Sim wrote:
> On 06/28/2011 04:10 PM, mick wrote:
> > On Tue, 2011-06-28 at 15:51 +0100, Gordon Sim wrote:
> >
> >>
> >> E.g. imagine the group relates to some real world object being
> >> modelled
> >> and each message contains describes an update
> >
> >
> > To me that situation seems like it should be modeled as a queue.
> 
> Fair point, however ...
> 
> > I think it would be very worthwhile in this discussion to figure out at
> > what point we feel that a customer ought to opt for a queue rather than
> > a message group.
> 
> ... I think one of the criteria is how many groups there are and how 
> dynamic the set of groups is.
> 
> In this example it might be that there are lots and lots of objects, but 
> only a few processing 'engines'. Further, the exact set of objects might 
> be dynamic. I don't want to have to tie the processing engines to 
> specific queues for each object, I want them to adapt to the changing 
> system.

OK -- so in general, 

queues should be used if 
  (1) their identities are known at start-up, or
  (2) their number is relatively stable over the life of the app

message groups should be used if
  (1) their existence and numbers are not known in advance, or
  (2) they may appear and disappear rapidly


When message groups *are* used, the Queue that they are sent through
represents some stable aspect of the message, while the Group represents
a transient aspect.

For example, a traffic management system might have a queue called
TrafficAccidentQueue -- but each individual accident is represented by a
separate MessageGroup within that queue.






---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/28/2011 04:10 PM, mick wrote:
> On Tue, 2011-06-28 at 15:51 +0100, Gordon Sim wrote:
>
>>
>> E.g. imagine the group relates to some real world object being
>> modelled
>> and each message contains describes an update
>
>
> To me that situation seems like it should be modeled as a queue.

Fair point, however ...

> I think it would be very worthwhile in this discussion to figure out at
> what point we feel that a customer ought to opt for a queue rather than
> a message group.

... I think one of the criteria is how many groups there are and how 
dynamic the set of groups is.

In this example it might be that there are lots and lots of objects, but 
only a few processing 'engines'. Further, the exact set of objects might 
be dynamic. I don't want to have to tie the processing engines to 
specific queues for each object, I want them to adapt to the changing 
system.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by mick <mg...@redhat.com>.
On Tue, 2011-06-28 at 15:51 +0100, Gordon Sim wrote:

> 
> E.g. imagine the group relates to some real world object being
> modelled 
> and each message contains describes an update 


To me that situation seems like it should be modeled as a queue.

I think it would be very worthwhile in this discussion to figure out at
what point we feel that a customer ought to opt for a queue rather than
a message group.  





---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/28/2011 03:31 PM, Alan Conway wrote:
> It seems to me that the first mode contains the second: the client can
> simply delay acknowledgement of at least one message until it is happy
> that the entire group is processed (which could be the entire life of
> the consumer, or some shorter span at the clients discretion.)

Where the group has a distinct scope and should be treated as a single 
logical unit, it is natural to acknowledge all the message comprising 
the group at once. There I agree that the first scope is sufficient.

However the group may last 'forever'; it could be that there is never a 
point where there are no more messages for the group ever likely to be 
published.

E.g. imagine the group relates to some real world object being modelled 
and each message contains describes an update to its state in relative 
terms (e.g. temperature increased by 5 degrees or whatever).

In order to process these messages the consumer must maintain the 
current state of the object, which is a function of all the messages 
seen thus far.

In this case it would be valuable to have all the messages for the group 
go to the same consumer while it was alive, to avoid anyone else having 
to reboot the initial state somehow (lets imagine that's 'expensive').

It would be annoying for the application to artificially keep one 
message unaccepted just to keep the association active. This is where I 
think the second scope is useful.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 06/28/2011 03:06 PM, Gordon Sim wrote:
> On 06/28/2011 02:34 PM, Ken Giusti wrote:
>> This is great feedback - thanks to all.
>>
>> I think the term "Message Groups" may not be the best term to use as
>> the name for this feature, as proposed in the qip. I agree that the
>> term "Message Group" has historically implied a sticky consumer - at
>> least that's what Google leads me to believe.
>>
>> The qip wasn't clear regarding the priority of "sticky consumer"
>> support - as Gordon correctly points out, this qip should lay the
>> foundation for supporting both behaviours, as (I believe) there is
>> value in both approaches.
>>
>> So: 1) I'd like to rename the QIP - is there a better term to use for
>> the "non-sticky" case than "Message Groups"?
>
> I actually quite like 'Message Groups' as a term, I just think there are
> different ways of treating a group.
>
> I think that rather than 'sticky' and 'non-sticky' we really just have two modes
> of stickiness, i.e. two scopes for the association between group and consumer.
>
> In the first mode the group is tied to a consumer only while that consumer has
> outstanding, unaccepted messages.
>
> This is sufficient to guarantee strict fifo processing in a group.
>
> It also meets Marnie's requirement of having the 'group as a whole' processed by
> the client without having to have any explicit end-of-group signal that the
> broker needs to recognise. The client implicitly signals the end-of-group when
> it acknowledges the messages in the group.
>
> The second mode extends the scope of the association. The group here is tied to
> a consumer for the life of that consumer. This meets Rob's further requirement
> of allowing any state required and/or built-up during processing of the group to
> be contained within the single consumer.
>
>

It seems to me that the first mode contains the second: the client can simply 
delay acknowledgement of at least one message until it is happy that the entire 
group is processed (which could be the entire life of the consumer, or some 
shorter span at the clients discretion.)

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/28/2011 02:34 PM, Ken Giusti wrote:
> This is great feedback - thanks to all.
>
> I think the term "Message Groups" may not be the best term to use as
> the name for this feature, as proposed in the qip.  I agree that the
> term "Message Group" has historically implied a sticky consumer - at
> least that's what Google leads me to believe.
>
> The qip wasn't clear regarding the priority of "sticky consumer"
> support - as Gordon correctly points out, this qip should lay the
> foundation for supporting both behaviours, as (I believe) there is
> value in both approaches.
>
> So: 1) I'd like to rename the QIP - is there a better term to use for
> the "non-sticky" case than "Message Groups"?

I actually quite like 'Message Groups' as a term, I just think there are 
different ways of treating a group.

I think that rather than 'sticky' and 'non-sticky' we really just have 
two modes of stickiness, i.e. two scopes for the association between 
group and consumer.

In the first mode the group is tied to a consumer only while that 
consumer has outstanding, unaccepted messages.

This is sufficient to guarantee strict fifo processing in a group.

It also meets Marnie's requirement of having the 'group as a whole' 
processed by the client without having to have any explicit end-of-group 
signal that the broker needs to recognise. The client implicitly signals 
the end-of-group when it acknowledges the messages in the group.

The second mode extends the scope of the association. The group here is 
tied to a consumer for the life of that consumer. This meets Rob's 
further requirement of allowing any state required and/or built-up 
during processing of the group to be contained within the single consumer.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Ken Giusti <kg...@redhat.com>.
This is great feedback - thanks to all.

I think the term "Message Groups" may not be the best term to use as the name for this feature, as proposed in the qip.  I agree that the term "Message Group" has historically implied a sticky consumer - at least that's what Google leads me to believe.  

The qip wasn't clear regarding the priority of "sticky consumer" support - as Gordon correctly points out, this qip should lay the foundation for supporting both behaviours, as (I believe) there is value in both approaches.

So:
1) I'd like to rename the QIP - is there a better term to use for the "non-sticky" case than "Message Groups"?

2) For the "non-sticky consumer" case: I like Gordon's suggestion of allowing delivery of a sub-sequence of the pending message group to a particular consumer, and using the capacity to set the blocking granularity.  I'll update the qip to include this.

3) for the "sticky consumer" case it sounds like there are a couple of variations:

a) all messages in a group that are present on the queue will be delivered to a single, arbitrarily selected consumer (best effort w.r.t. client lifespan, no state mapping the group-id to consumer preserved once all messages from the group have been consumed).

b) all messages in a group are delivered to a single, arbitrarily selected consumer for the duration of the message group (ditto client lifespan, state mapping with some degree of persistence (?), End-of-Group message marking or sequencing (?)

c) other "non-sticky" constraints we need to consider?
 
thanks,


-K

----- Original Message -----
> On 06/28/2011 11:39 AM, Gordon Sim wrote:
> > So I think the requirements are:
> >
> > (1) message group must always be processed in order
> >
> > (2) all messages in a group are processed by the same consumer
> >
> > (3) the consumer will only complete processing having received the
> > 'final' message in the group
> >
> > Where (3) implies (2) and (2) implies (1).
> 
> Actually on further consideration, I think I need to qualify this last
> sentence a little.
> 
> The broker doesn't know what the 'final' message is. However it
> requires
> some state to track stickiness of group to consumer. In use case (3) I
> actually think the assumption on stickiness is the same as for my
> preferred implementation of (1). I.e. the association between group
> and
> consumer only lasts for as long as there are outstanding unaccepted
> messages.
> 
> In that sense (3) can be met along with (1). There is of course a
> sense
> where it may still be true that (3) implies that all the messages in a
> group are processed by the same consumer. However that is really based
> on the external consideration of whether there will be any further use
> of the same group id.
> 
> So I do think that the key distinction is in the scope of the
> association between group and consumer. It is either scoped to the
> life
> of the consumer (satisfying requirement 2) or it is scoped to the
> existence of outstanding, unaccepted messages in the group (satisfying
> requirements 1 and 3).
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/28/2011 11:39 AM, Gordon Sim wrote:
> So I think the requirements are:
>
> (1) message group must always be processed in order
>
> (2) all messages in a group are processed by the same consumer
>
> (3) the consumer will only complete processing having received the
> 'final' message in the group
>
> Where (3) implies (2) and (2) implies (1).

Actually on further consideration, I think I need to qualify this last 
sentence a little.

The broker doesn't know what the 'final' message is. However it requires 
some state to track stickiness of group to consumer. In use case (3) I 
actually think the assumption on stickiness is the same as for my 
preferred implementation of (1). I.e. the association between group and 
consumer only lasts for as long as there are outstanding unaccepted 
messages.

In that sense (3) can be met along with (1). There is of course a sense 
where it may still be true that (3) implies that all the messages in a 
group are processed by the same consumer. However that is really based 
on the external consideration of whether there will be any further use 
of the same group id.

So I do think that the key distinction is in the scope of the 
association between group and consumer. It is either scoped to the life 
of the consumer (satisfying requirement 2) or it is scoped to the 
existence of outstanding, unaccepted messages in the group (satisfying 
requirements 1 and 3).

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/28/2011 09:57 AM, Marnie McCormack wrote:
> In JMS, the sequence id would be used to maintain order, but the expectation
> is that the group are processed as a whole on receipt of the final message
> in the group rather than simply shared out between consumers and processed
> in order.

I think that is a third variation (further restriction really) on the 
use case. I.e. where message group is used to piece together a large 
unit from discrete messages.

So I think the requirements are:

(1) message group must always be processed in order

(2) all messages in a group are processed by the same consumer

(3) the consumer will only complete processing having received the 
'final' message in the group

Where (3) implies (2) and (2) implies (1).

The third requirement essentially rules out an implementation of the 
first requirement that refuses to deliver the next message in the group 
until the previous message is accepted.

The second requirement is important where there is a desire to contain 
all the necessary state for processing a group of messages within a 
single consumer. I agree that is important and should be supported. 
However I think it is not always required and it should be possible to 
drop that if not needed as that could lead to more adaptive 
load-balancing while meeting the first requirement.

So I think the configuration involved is:

(a) identification of the header that signifies a group within a given 
queue, which implicitly enables requirement (1)

(b) an indication that 'stickiness' of group to consumer is required 
(beyond simply meeting the first requirement)

I think we can avoid the need for any explicit configuration of 
requirement (3). Refusing to deliver the next message in the group until 
the previous message is accepted is in my view unnecessary to meet (1) 
and could be emulated more or less by controlling prefetch.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Marnie McCormack <ma...@googlemail.com>.
My reading of the JMS spec and common implmentations/user expectations would
be the same as Rob's - i.e. the message order is not the only characteristic
used to define group support.

In JMS, the sequence id would be used to maintain order, but the expectation
is that the group are processed as a whole on receipt of the final message
in the group rather than simply shared out between consumers and processed
in order.

Marnie
On Mon, Jun 27, 2011 at 8:00 PM, Gordon Sim <gs...@redhat.com> wrote:

> On 06/27/2011 07:58 PM, Gordon Sim wrote:
>
>> For most applications however I believe you can still keep the strict
>> ordering even allowing for 'requeuing' due to rollback or connection
>> failure.
>>
>
> Where you want the stronger guarantee that a message is not even delivered
> until all preceding messages in the group are accepted the clients can set a
> prefetch of 1 for that queue.
>
>
> ------------------------------**------------------------------**---------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.**apache.org<de...@qpid.apache.org>
>
>

Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/27/2011 07:58 PM, Gordon Sim wrote:
> For most applications however I believe you can still keep the strict
> ordering even allowing for 'requeuing' due to rollback or connection
> failure.

Where you want the stronger guarantee that a message is not even 
delivered until all preceding messages in the group are accepted the 
clients can set a prefetch of 1 for that queue.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/27/2011 07:54 PM, Robert Godfrey wrote:
> On 27 June 2011 17:41, Gordon Sim<gs...@redhat.com>  wrote:
>
>> On 06/27/2011 05:25 PM, Robert Godfrey wrote:
>>
>>> So, in general this looks good, and is something we could support in the
>>> Java Broker... My one comment would be that I think the more standard
>>> behaviour for a "Group" is to ensure all messages in the same group are
>>> delivered (in order) to the same consumer, with the notion of blocking
>>> delivery of the nth + 1 element of the group until the nth element is
>>> acknowledged seeming more like an alternate behaviour.
>>>
>>
>> I don't see a lot of value in blocking delivery of the nth +1 element until
>> the nth element is acknowledged. I think the important thing is to deliver
>> the messages within a group in order in spite of having competing consumers.
>> You can do that simply by ensuring that the nth + 1 message here goes to the
>> same consumer as the nth message.
>>
>> I agree that a common approach is for a group to be 'sticky' to a given
>> consumer.
>>
>> However I can also see value in a scheme where that stickiness ends not
>> when the consumer is closed, but when there are no outstanding in-doubt
>> messages for the group. In the case where the requirement is simply in-order
>> processing of messages in a group this might allow for more adaptive load
>> balancing across parallel consumers.
>>
>>
> Sure, but the two schemes are not interchangeable... if the reason that the
> client wants the "sticky" behaviour is that they are going to maintain state
> at that and only that client for the processing of that group, then the
> ordering guarantee does not help them (and indeed may be contrary to their
> expectations).  The question is whether people are using grouping for strict
> ordering, or to ensure a locality for processing messages of the same
> group.

Absolutely. I just think that both cases are valuable and a lot of the 
implementation is common between them.

> With ordering, are you suggesting a strict order based on enqueue time into
> the queue, or upon a provided sequence number?

It would be based on enqueue order, i.e. strict FIFO for a given group.

>  If a message gets
> re-enqueued does it retain its original order with respect to the rest of
> the group?

By re-enqueued you mean becoming available for redelivery after being 
released explicitly or implicitly? If so then yes, I believe it must 
keep its original order.

That does of course mean that clients can't accept out of order and 
can't arbitrarily release messages. For most applications however I 
believe you can still keep the strict ordering even allowing for 
'requeuing' due to rollback or connection failure.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Robert Godfrey <ro...@gmail.com>.
On 27 June 2011 17:41, Gordon Sim <gs...@redhat.com> wrote:

> On 06/27/2011 05:25 PM, Robert Godfrey wrote:
>
>> So, in general this looks good, and is something we could support in the
>> Java Broker... My one comment would be that I think the more standard
>> behaviour for a "Group" is to ensure all messages in the same group are
>> delivered (in order) to the same consumer, with the notion of blocking
>> delivery of the nth + 1 element of the group until the nth element is
>> acknowledged seeming more like an alternate behaviour.
>>
>
> I don't see a lot of value in blocking delivery of the nth +1 element until
> the nth element is acknowledged. I think the important thing is to deliver
> the messages within a group in order in spite of having competing consumers.
> You can do that simply by ensuring that the nth + 1 message here goes to the
> same consumer as the nth message.
>
> I agree that a common approach is for a group to be 'sticky' to a given
> consumer.
>
> However I can also see value in a scheme where that stickiness ends not
> when the consumer is closed, but when there are no outstanding in-doubt
> messages for the group. In the case where the requirement is simply in-order
> processing of messages in a group this might allow for more adaptive load
> balancing across parallel consumers.
>
>
Sure, but the two schemes are not interchangeable... if the reason that the
client wants the "sticky" behaviour is that they are going to maintain state
at that and only that client for the processing of that group, then the
ordering guarantee does not help them (and indeed may be contrary to their
expectations).  The question is whether people are using grouping for strict
ordering, or to ensure a locality for processing messages of the same
group.

With ordering, are you suggesting a strict order based on enqueue time into
the queue, or upon a provided sequence number?  If a message gets
re-enqueued does it retain its original order with respect to the rest of
the group?

-- Rob

Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 06/27/2011 05:25 PM, Robert Godfrey wrote:
> So, in general this looks good, and is something we could support in the
> Java Broker... My one comment would be that I think the more standard
> behaviour for a "Group" is to ensure all messages in the same group are
> delivered (in order) to the same consumer, with the notion of blocking
> delivery of the nth + 1 element of the group until the nth element is
> acknowledged seeming more like an alternate behaviour.

I don't see a lot of value in blocking delivery of the nth +1 element 
until the nth element is acknowledged. I think the important thing is to 
deliver the messages within a group in order in spite of having 
competing consumers. You can do that simply by ensuring that the nth + 1 
message here goes to the same consumer as the nth message.

I agree that a common approach is for a group to be 'sticky' to a given 
consumer.

However I can also see value in a scheme where that stickiness ends not 
when the consumer is closed, but when there are no outstanding in-doubt 
messages for the group. In the case where the requirement is simply 
in-order processing of messages in a group this might allow for more 
adaptive load balancing across parallel consumers.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org