You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Ken Giusti <kg...@redhat.com> on 2011/07/01 13:04:32 UTC

Re: Proposed QIP: Support for Message Grouping in the broker.

Folks,

Here's a second draft of the proposal.  I've tried to incorporate the feedback provided during the last week.  And I have tried to limit the proposed feature set a bit more.

Opinions welcome, thanks -

-K


# Message Groups

## Status

Draft

## Summary

This document describes a new feature that would allow an application to
classify a set of related messages as belonging to a group. This document also
describes two policies that the broker could apply when delivering a message
group to one or more consumers.


## Problem

It would be useful to give an application the ability to classify a set of
messages as belonging to a single unit of work.  Furthermore, if the broker can
identify messages belonging to the same unit of work, it can enforce policies
that control how that unit of work can be consumed.

For example, it may be desirable to guarantee that a particular set of
messages are consumed by the same client, even in the case where there are
multiple clients consuming from the same queue.

In a different scenario, it may be permissible for different clients to consume
messages from the same group, as long as it can be guaranteed that the
individual messages are not processed in parallel.  In other words, the broker
would ensure that messages are processed by consumers in the same order in
which they were enqueued.

For example, assume we have a shopping application that manages items in a
virtual shopping cart.  A user may add an item to their shopping cart, then
change their mind and remove it.  If the application sends an "add" message to
the broker, immediately followed by a "remove" message, they will be queued in
the proper order - "add", then "remove".

However, if there are multiple consumers, it is possible that once a consumer
acquires the "add" message, a different consumer may acquire the "remove"
message.  This allows both messages to be processed in parallel, which could
result in the "remove" operation being performed before the "add" operation.


## Solution

This QIP proposes the following:

1) provide the ability for a message producer to designate a set of messages
as belonging to the same group.

2) allow the broker to identify messages that belong to the same group.

3) define policies for the broker that control the delivery of messages
belonging to the same group.

For #1:

The sending application would define a message header that would contain the
message's group identifier.  The group identifier stored in that header field
would be a string value determined by the application.  Messages from the same
group would have the same group identifier value.

For #2:

The key for the header that contains the group identifier would be provided to
the broker via configuration.

>From the broker's perspective, the number of different group id values would be
unlimited.  And the value of group identifiers would not be provided ahead of
time by configuration: the broker must learn them at runtime.

For #3:

This QIP defines two message group policies.  Additional policies may be
defined in the future.

Policy 1: Exclusive Consumer

With this policy, the broker would guarantee that all messages in a group
would be delivered to the same client.

This policy would be configured on a per-queue basis.  When the first message
of a new message group becomes available for delivery, the broker will
associate that group with the next available consumer.  The broker would then
guarantee that all messages from that group are delivered to that consumer
only.

The broker will maintain the group/client association for the lifetime of the
client.  Should the client die or cancel its subscription, any unacknowledged
messages in the group will be assigned to a different client (preserving
message order).  Group/client associations are not maintained across broker
restart.  These associations must be replicated in a clustered broker.


Policy #2: Sequenced Consumers

With this policy, the broker would guarantee that the order in which messages
in a group are processed by consumers is the same order in which the messages
where enqueued.  This guarantee would be upheld even if the messages of the
group are processed by different consumers.  No two messages from the same
group would be processed in parallel by different consumers.

Specifically, for any given group, the broker allows only the first N messages
in the group to be available for delivery to a particular consumer.  The value
of N would be determined by the selected consumer's configured prefetch
capacity.  The broker blocks access to the remaining messages in that group by
any other consumer.  Once the selected consumer has acknowledged that first set
of delivered messages, the broker allows the next messages in the group to be
available for delivery.  The next set of messages may be delivered to a
different consumer.

This policy would be configured on a per-queue basis.  Configuration would
include designating the key of the application header that specifies the group
id.

Note will that, with this policy, the consuming application has to:

1. ensure that it has completely processed the data in a received message
before accepting that message, as described in Section 2.6.2. Transfer of
Responsibility, of the AMQP-0.10 specification.

2. ensure that messages are not selectively acknowledged or released - order
must be preserved in both cases.

Note well that in the case of either of these proposed policies, distinct
message groups would not block each other from delivery.  For example, assume a
queue contains messages from two different message groups - say group "A" and
group "B" - and they are enqueued such that "A"'s messages are in front of
"B". If the first message of group "A" is in the process of being consumed by a
client, then the remaining "A" messages are blocked, but the messages of the
"B" group are available for consumption - even though it is "behind" group "A"
in the queue.


## Rationale


The solution described above allows an application to designate a set of
related messages, and provides policies for controlling the consumption of the
message group.

 * Goal: allow dynamic values for the group identifiers (no preconfiguration of identifiers necessary)
 * Goal: the number of message groups "in flight" should only be limited by the available resources (no need to configure a hard limit).
 * Goal: manageability: visibility into the message groups currently on a given queue
 * Goal: manageability: purge, move, reroute messages at the group level.

## Implementation Notes

 * Queues: support configuration of the group identifier header key.
 * Messages: provide access to group identifier.
 * Queues: identify head message of next available message group.
 * Queues: block the trailing messages in a given message group from being consumed.
 * Consumers: track the message group of the currently acquired message(s).
 * Clustering: ensure state is replicated within the cluster as needed.

## Consequences

 * __Development:__ No changes to the development process.
 * __Release:__ No changes to the release process.
 * __Documentation:__ User documentation will be needed to explain the feature, and the steps to configure and manage the new feature.
 * __Configuration:__ Yes: per-queue group identifier header key is configurable.  Queue state with regard to in-flight message groups needs to be visible.  Additional methods to purge/move/reroute a message group.
 * __Compatibility:__ Unlikely - new feature that would need to be enabled manually.  Applications wishing to use the feature would need to implement a message grouping policy, and ensure the processing of received message data is compliant with the desired policy.

## References

 * None.

## Contributor-in-Charge

Kenneth Giusti, <kg...@redhat.com>

## Contributors

 * Ted Ross, <tr...@redhat.com>

## Version

0.2







----- Original Message -----
> On 06/29/2011 11:16 AM, Gordon Sim wrote:
> > On 06/29/2011 11:08 AM, Alan Conway wrote:
> >> Gordon's mode 1 (stick to a consumer only as long as it has unacked
> >> messages) doesn't really do this: the only time it allows load to
> >> shift
> >> is when the queue is idle long enough for the consumer to ack all
> >> its
> >> pre-fetched messages. But if the queue is idle then we're not under
> >> heavy load which is where load balancing is most important.
> >
> > That's not necessarily true. The queue could have plenty of messages
> > on it and
> > not be idle, but not have messages in a particular group.
> 
> Good point!
> 
> > Where I think this mode may make load balancing more adaptive is
> > where there is
> > variability in the groups and the processing times of messages.
> >
> > For example, consumer 1 gets assigned group 1 and processes 1
> > available message
> > and then accepts it, there are no more messages at present for that
> > group. It
> > then gets messages sent for groups 3, 5 and 6 say. If those groups
> > now get a lot
> > of messages, consumer 1 may become quite busy. If another set of
> > messages for
> > group 1 are published, why not allow them to be processed by a
> > different
> > consumer. Assuming of course that the requirement is simply in-order
> > processing.
> >
> > I'm certainly not claiming this is perfect. However it seems to me
> > that it is
> > not necessary in all cases to maintain the stickiness for the
> > lifetime of the
> > consumer.
> 
> Indeed, if load balancing over long-lived groups is the primary
> concern (e.g.
> stock symbols as groups) then it is important that groups do not stick
> to
> consumers for their lifetime.
> 
> >> This makes me think there is a case for a 3rd mode of stickiness:
> >> non-stick mode. In this mode we randomly chose the next consumer
> >> when
> >> the current consumer acks. Clearly this will lower the possible
> >> msg/sec
> >> throughput since you are effectively doing synchronous messaging.
> >> However it may still be relevant where the msg/sec throughput is
> >> low
> >> anyway but the need for fair load balancing is high - e.g. very
> >> large
> >> messages or messages that require a long time to process. In that
> >> case
> >> the extra time added by synchronous messages may be negligible, but
> >> the
> >> automatic distribution of load is very valuable.
> >
> > Why not just set the prefetch on the consumers to 0 or 1, and have
> > the consumer
> > accept each message as it is processed. That would seem to achieve
> > the same
> > thing and is exactly what you would do to improve the fairness of
> > distribution
> > without message groups (again at the expense of throughput).
> 
> Excellent point. My non-stick mode is just your mode 2 + prefetch = 0.
> Very neat.
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 07/25/2011 10:03 AM, Gordon Sim wrote:
> On 07/25/2011 02:46 PM, Alan Conway wrote:
>> Response in line
>>
>> [snip]
>>> Ah, yes - sorry. The lifetime of the group's state depends on the type
>>> of "Policy" that is being used (see below). For the "Sequenced
>>> Consumers" policy - which is what I was thinking of in my last reply -
>>> the broker doesn't need to maintain state across all messages of the
>>> group. With that policy, the state can be dropped once there are no
>>> more messages for that group present in the broker.
>>>
>>
>> Again I don't get that. You're putting group boundaries at unpredictable
>> arbitrary points depending on relative speed of producer/consumer. The
>> producer may send the first N messages of a group containing N+M
>> messages, then the consumer consumes the first N messages. Now the queue
>> is empty so the next M messages are considered a new group, where N is
>> impossibly to predict in advance. So you randomly cut up the groups.
>
> Cutting up the groups doesn't matter though. The requirement here is only that
> the messages be processed in order.

Of course.

> We only need to hold state regarding groups in order to prevent delivery of
> messages to one consumer when earlier messages of the same group have been given
> to another consumer who has not yet indicated that they have been processed.

Right, and for those groups we can use the termination of the consumer as the 
end of the group. It all fits.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 07/25/2011 02:46 PM, Alan Conway wrote:
> Response in line
>
> [snip]
>> Ah, yes - sorry. The lifetime of the group's state depends on the type
>> of "Policy" that is being used (see below). For the "Sequenced
>> Consumers" policy - which is what I was thinking of in my last reply -
>> the broker doesn't need to maintain state across all messages of the
>> group. With that policy, the state can be dropped once there are no
>> more messages for that group present in the broker.
>>
>
> Again I don't get that. You're putting group boundaries at unpredictable
> arbitrary points depending on relative speed of producer/consumer. The
> producer may send the first N messages of a group containing N+M
> messages, then the consumer consumes the first N messages. Now the queue
> is empty so the next M messages are considered a new group, where N is
> impossibly to predict in advance. So you randomly cut up the groups.

Cutting up the groups doesn't matter though. The requirement here is 
only that the messages be processed in order.

We only need to hold state regarding groups in order to prevent delivery 
of messages to one consumer when earlier messages of the same group have 
been given to another consumer who has not yet indicated that they have 
been processed.

>> For the other proposed policy - Exclusive Consumer - then the group
>> state would be associated with a particular Consumer instance, and
>> remain present as long as the Consumer exists. So in that case, yes -
>> there is the potential that an unbounded number of group states could
>> exist without some kind of reclaim strategy. And end-of-group marker
>> could be used, but that wouldn't prevent a DOS by a misbehaving app
>> that creates endless groups w/o setting the end marker. Perhaps a hard
>> max-groups-per-Consumer limit, or a TTL for idle groups?
>
> I'm not so worried about that, it's no worse than the fact that a client
> can create a huge number of of sessions or consumers etc. Solving that
> family of DOS attacks is a separate issue I think.

I agree. I'd leave worrying about this for a later stage.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
Response in line

[snip]
>>>
>>> In this case, it probably would be sufficient to maintain a counter
>>> in the group's state. I'm assuming that we'll need a group lookup on
>>> message arrival to find that state (or create it on the first
>>> received message). Increment the counter when a message in that
>>> group is enqueued, decrement when the message is dequeued. If
>>> decremented to zero, delete the group. Performance impact of such an
>>> approach is TBD, of course - probably need to pool the state objects
>>> to limit memory trashing, etc....
>>>
>>
>> That doesn't sound right. The first N messages of a group could be
>> dequeued
>> before the N+1 message is enqueued, which would put your counter to 0
>> before the
>> entire group is processed. I think we need an explicit end of group
>> marker on
>> the last message in the group to allow the user to close a group in a
>> sceanrio
>> with lots of dynamic group ids. You could ignore this in cases where
>> there is a
>> small fixed set of groups that are used over a long time.
>>
>
>
> Ah, yes - sorry.   The lifetime of the group's state depends on the type of "Policy" that is being used (see below).  For the "Sequenced Consumers" policy - which is what I was thinking of in my last reply - the broker doesn't need to maintain state across all messages of the group.  With that policy, the state can be dropped once there are no more messages for that group present in the broker.
>

Again I don't get that. You're putting group boundaries at unpredictable 
arbitrary points depending on relative speed of producer/consumer. The producer 
may send the first N messages of a group containing N+M messages, then the 
consumer consumes the first N messages. Now the queue is empty so the next M 
messages are considered a new group, where N is impossibly to predict in 
advance. So you randomly cut up the groups.

> For the other proposed policy - Exclusive Consumer - then the group state would be associated with a particular Consumer instance, and remain present as long as the Consumer exists.  So in that case, yes - there is the potential that an unbounded number of group states could exist without some kind of reclaim strategy.   And end-of-group marker could be used, but that wouldn't prevent a DOS by a misbehaving app that creates endless groups w/o setting the end marker.  Perhaps a hard max-groups-per-Consumer limit, or a TTL for idle groups?

I'm not so worried about that, it's no worse than the fact that a client can 
create a huge number of of sessions or consumers etc. Solving that family of DOS 
attacks is a separate issue I think.

>
>
>
>>>
>>>
>>>>> For #2:
>>>>>
>>>>> The key for the header that contains the group identifier would be
>>>>> provided to
>>>>> the broker via configuration.
>>>>
>>>> A fixed name like 'qpid.group' seems less likely to confuse.
>>>> Perhaps
>>>> have id
>>>> default to 'qpid.group' but be cofigurable to another name in the
>>>> (unlikely?)
>>>> event that it is required.
>>>>
>>>
>>> That's a good idea. The current AMQP 1.0 draft defines a message
>>> property header called "group-id", perhaps we should use
>>> "qpid.group-id" as the default instead of "qpid.group"?
>>
>> Should we be using the AMQP group-id? What's it for?
>>
>
>
> It appears to be for the exact same purpose - identify the group a message belongs to.  Set by the application and preserved to destination.  But I don't see any specific use cases proposed in the spec.
>
>
>>
>>>>>      From the broker's perspective, the number of different group id
>>>>>    values would be
>>>>> unlimited. And the value of group identifiers would not be
>>>>> provided
>>>>> ahead of
>>>>> time by configuration: the broker must learn them at runtime.
>>>>>
>>>>> For #3:
>>>>>
>>>>> This QIP defines two message group policies. Additional policies
>>>>> may
>>>>> be
>>>>> defined in the future.
>>>>>
>>>>> Policy 1: Exclusive Consumer
>>>>>
>>>>> With this policy, the broker would guarantee that all messages in
>>>>> a
>>>>> group
>>>>> would be delivered to the same client.
>>>>>
>>>>> This policy would be configured on a per-queue basis.
>>
>> We could configure policy on a per-group basis with the value of the
>> "qpid.group" property. E.g. qpid.group=exclusive.
>> That's more flexible and puts control of the policy in the hands of
>> the
>> producer, rather than requiring an admin convention that associates a
>> queue name
>> with a policy.
>>
>
> I'm not sure I completely understand: are you proposing the producer sets both the group identifier and a policy type identifier on a per-message basis?
>
>
>>>>>   When the first
>>>>> message
>>>>> of a new message group becomes available for delivery, the broker
>>>>> will
>>>>> associate that group with the next available consumer. The broker
>>>>> would then
>>>>> guarantee that all messages from that group are delivered to that
>>>>> consumer
>>>>> only.
>>>>>
>>>>> The broker will maintain the group/client association for the
>>>>> lifetime of the
>>>>> client. Should the client die or cancel its subscription, any
>>>>> unacknowledged
>>>>> messages in the group will be assigned to a different client
>>>>> (preserving
>>>>> message order). Group/client associations are not maintained
>>>>> across
>>>>> broker
>>>>> restart. These associations must be replicated in a clustered
>>>>> broker.
>>>>>
>>>>>
>>>>> Policy #2: Sequenced Consumers
>>>>>
>>>>> With this policy, the broker would guarantee that the order in
>>>>> which
>>>>> messages
>>>>> in a group are processed by consumers is the same order in which
>>>>> the
>>>>> messages
>>>>> where enqueued. This guarantee would be upheld even if the
>>>>> messages
>>>>> of the
>>>>> group are processed by different consumers. No two messages from
>>>>> the
>>>>> same
>>>>> group would be processed in parallel by different consumers.
>>>>>
>>>>> Specifically, for any given group, the broker allows only the
>>>>> first
>>>>> N messages
>>>>> in the group to be available for delivery to a particular
>>>>> consumer.
>>>>> The value
>>>>> of N would be determined by the selected consumer's configured
>>>>> prefetch
>>>>> capacity. The broker blocks access to the remaining messages in
>>>>> that
>>>>> group by
>>>>> any other consumer. Once the selected consumer has acknowledged
>>>>> that
>>>>> first set
>>>>> of delivered messages, the broker allows the next messages in the
>>>>> group to be
>>>>> available for delivery. The next set of messages may be delivered
>>>>> to
>>>>> a
>>>>> different consumer.
>>>>>
>>>>> This policy would be configured on a per-queue basis.
>>>>> Configuration
>>>>> would
>>>>> include designating the key of the application header that
>>>>> specifies
>>>>> the group
>>>>> id.
>>>>>
>>>>> Note will that, with this policy, the consuming application has
>>>>> to:
>>>>>
>>>>> 1. ensure that it has completely processed the data in a received
>>>>> message
>>>>> before accepting that message, as described in Section 2.6.2.
>>>>> Transfer of
>>>>> Responsibility, of the AMQP-0.10 specification.
>>>>>
>>>>> 2. ensure that messages are not selectively acknowledged or
>>>>> released
>>>>> - order
>>>>> must be preserved in both cases.
>>>>
>>>> What happens if the consuming application fails to respect these
>>>> requirements?
>>>>
>>>
>>> Hmmm... good question. My initial thoughts are that this would be
>>> considered an application error, since preserving order is important
>>> for this feature. Perhaps the broker should respond with an error
>>> exception?
>>>
>>>
>>>>
>>>>> Note well that in the case of either of these proposed policies,
>>>>> distinct
>>>>> message groups would not block each other from delivery. For
>>>>> example, assume a
>>>>> queue contains messages from two different message groups - say
>>>>> group "A" and
>>>>> group "B" - and they are enqueued such that "A"'s messages are in
>>>>> front of
>>>>> "B". If the first message of group "A" is in the process of being
>>>>> consumed by a
>>>>> client, then the remaining "A" messages are blocked, but the
>>>>> messages of the
>>>>> "B" group are available for consumption - even though it is
>>>>> "behind"
>>>>> group "A"
>>>>> in the queue.
>>>>>
>>>>>
>>>>> ## Rationale
>>>>>
>>>>>
>>>>> The solution described above allows an application to designate a
>>>>> set of
>>>>> related messages, and provides policies for controlling the
>>>>> consumption of the
>>>>> message group.
>>>>>
>>>>>     * Goal: allow dynamic values for the group identifiers (no
>>>>>     preconfiguration of identifiers necessary)
>>>>>     * Goal: the number of message groups "in flight" should only be
>>>>>     limited by the available resources (no need to configure a hard
>>>>>     limit).
>>>>>     * Goal: manageability: visibility into the message groups
>>>>>     currently on a given queue
>>>>>     * Goal: manageability: purge, move, reroute messages at the
>>>>>     group
>>>>>     level.
>>>>>
>>>>> ## Implementation Notes
>>>>>
>>>>>     * Queues: support configuration of the group identifier header
>>>>>     key.
>>>>>     * Messages: provide access to group identifier.
>>>>>     * Queues: identify head message of next available message
>>>>>     group.
>>>>>     * Queues: block the trailing messages in a given message group
>>>>>     from being consumed.
>>>>>     * Consumers: track the message group of the currently acquired
>>>>>     message(s).
>>>>>     * Clustering: ensure state is replicated within the cluster as
>>>>>     needed.
>>>>>
>>>>> ## Consequences
>>>>>
>>>>>     * __Development:__ No changes to the development process.
>>>>>     * __Release:__ No changes to the release process.
>>>>>     * __Documentation:__ User documentation will be needed to
>>>>>     explain
>>>>>     the feature, and the steps to configure and manage the new
>>>>>     feature.
>>>>>     * __Configuration:__ Yes: per-queue group identifier header key
>>>>>     is
>>>>>     configurable. Queue state with regard to in-flight message
>>>>>     groups
>>>>>     needs to be visible. Additional methods to purge/move/reroute a
>>>>>     message group.
>>>>>     * __Compatibility:__ Unlikely - new feature that would need to
>>>>>     be
>>>>>     enabled manually. Applications wishing to use the feature would
>>>>>     need to implement a message grouping policy, and ensure the
>>>>>     processing of received message data is compliant with the
>>>>>     desired
>>>>>     policy.
>>>>>
>>>>> ## References
>>>>>
>>>>>     * None.
>>>>>
>>>>> ## Contributor-in-Charge
>>>>>
>>>>> Kenneth Giusti,<kg...@redhat.com>
>>>>>
>>>>> ## Contributors
>>>>>
>>>>>     * Ted Ross,<tr...@redhat.com>
>>>>>
>>>>> ## Version
>>>>>
>>>>> 0.2
>>>>>
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> Apache Qpid - AMQP Messaging Implementation
>>>> Project: http://qpid.apache.org
>>>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>>
>>> ---------------------------------------------------------------------
>>> Apache Qpid - AMQP Messaging Implementation
>>> Project: http://qpid.apache.org
>>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project: http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
Sorry, didn't read entire mail last time...

On 07/20/2011 01:39 PM, Ken Giusti wrote:
[snip]
>>>
>>> In this case, it probably would be sufficient to maintain a counter
>>> in the group's state. I'm assuming that we'll need a group lookup on
>>> message arrival to find that state (or create it on the first
>>> received message). Increment the counter when a message in that
>>> group is enqueued, decrement when the message is dequeued. If
>>> decremented to zero, delete the group. Performance impact of such an
>>> approach is TBD, of course - probably need to pool the state objects
>>> to limit memory trashing, etc....
>>>
>>
>> That doesn't sound right. The first N messages of a group could be
>> dequeued
>> before the N+1 message is enqueued, which would put your counter to 0
>> before the
>> entire group is processed. I think we need an explicit end of group
>> marker on
>> the last message in the group to allow the user to close a group in a
>> sceanrio
>> with lots of dynamic group ids. You could ignore this in cases where
>> there is a
>> small fixed set of groups that are used over a long time.
>>
>
>
> Ah, yes - sorry.   The lifetime of the group's state depends on the type of "Policy" that is being used (see below).  For the "Sequenced Consumers" policy - which is what I was thinking of in my last reply - the broker doesn't need to maintain state across all messages of the group.  With that policy, the state can be dropped once there are no more messages for that group present in the broker.
>

Again I don't get that. You're putting group boundaries at unpredictable 
arbitrary points depending on relative speed of producer/consumer. The producer 
may send the first N messages of a group containing N+M messages, then the 
consumer consumes the first N messages. Now the queue is empty so the next M 
messages are considered a new group, where N is impossibly to predict in 
advance. So you randomly cut up the groups.

> For the other proposed policy - Exclusive Consumer - then the group state would be associated with a particular Consumer instance, and remain present as long as the Consumer exists.  So in that case, yes - there is the potential that an unbounded number of group states could exist without some kind of reclaim strategy.   And end-of-group marker could be used, but that wouldn't prevent a DOS by a misbehaving app that creates endless groups w/o setting the end marker.  Perhaps a hard max-groups-per-Consumer limit, or a TTL for idle groups?

I'm not so worried about that, it's no worse than the fact that a client can 
create a huge number of of sessions or consumers etc. Solving that family of DOS 
attacks is a separate issue I think.

[snip]
>>>>> This QIP defines two message group policies. Additional policies
>>>>> may
>>>>> be
>>>>> defined in the future.
>>>>>
>>>>> Policy 1: Exclusive Consumer
>>>>>
>>>>> With this policy, the broker would guarantee that all messages in
>>>>> a
>>>>> group
>>>>> would be delivered to the same client.
>>>>>
>>>>> This policy would be configured on a per-queue basis.
>>
>> We could configure policy on a per-group basis with the value of the
>> "qpid.group" property. E.g. qpid.group=exclusive.
>> That's more flexible and puts control of the policy in the hands of
>> the
>> producer, rather than requiring an admin convention that associates a
>> queue name
>> with a policy.
>>
>
> I'm not sure I completely understand: are you proposing the producer sets both the group identifier and a policy type identifier on a per-message basis?

As above I think we need an explicit "end group" message, so it would be 
symmetric to have an explicit "start group" message also that specifies the 
policy. This could be done with multiple properties (but that makes configuring 
the property name more complicated) or with a configuration string e.g. 
group-id=mygroup{policy=xxx} or somesuch. I'm not sure which approach I prefer 
both are a little icky.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Ken Giusti <kg...@redhat.com>.
Responses inline:

-K

----- Original Message -----
> On 07/20/2011 11:57 AM, Ken Giusti wrote:
> >
> > Alan - thanks for the input, replies inline....
> >
> > -K
> >
> > ----- Original Message -----
> >> On 07/01/2011 07:04 AM, Ken Giusti wrote:
> >>> Folks,
> >>>
> >>> Here's a second draft of the proposal. I've tried to incorporate
> >>> the
> >>> feedback provided during the last week. And I have tried to limit
> >>> the proposed feature set a bit more.
> >>>
> >>> Opinions welcome, thanks -
> >>>
> >>> -K
> >>>
> >>>
> >>> # Message Groups
> >>>
> >>> ## Status
> >>>
> >>> Draft
> >>>
> >>> ## Summary
> >>>
> >>> This document describes a new feature that would allow an
> >>> application to
> >>> classify a set of related messages as belonging to a group. This
> >>> document also
> >>> describes two policies that the broker could apply when delivering
> >>> a
> >>> message
> >>> group to one or more consumers.
> >>>
> >>>
> >>> ## Problem
> >>>
> >>> It would be useful to give an application the ability to classify
> >>> a
> >>> set of
> >>> messages as belonging to a single unit of work. Furthermore, if
> >>> the
> >>> broker can
> >>> identify messages belonging to the same unit of work, it can
> >>> enforce
> >>> policies
> >>> that control how that unit of work can be consumed.
> >>>
> >>> For example, it may be desirable to guarantee that a particular
> >>> set
> >>> of
> >>> messages are consumed by the same client, even in the case where
> >>> there are
> >>> multiple clients consuming from the same queue.
> >>>
> >>> In a different scenario, it may be permissible for different
> >>> clients
> >>> to consume
> >>> messages from the same group, as long as it can be guaranteed that
> >>> the
> >>> individual messages are not processed in parallel. In other words,
> >>> the broker
> >>> would ensure that messages are processed by consumers in the same
> >>> order in
> >>> which they were enqueued.
> >>>
> >>> For example, assume we have a shopping application that manages
> >>> items in a
> >>> virtual shopping cart. A user may add an item to their shopping
> >>> cart, then
> >>> change their mind and remove it. If the application sends an "add"
> >>> message to
> >>> the broker, immediately followed by a "remove" message, they will
> >>> be
> >>> queued in
> >>> the proper order - "add", then "remove".
> >>>
> >>> However, if there are multiple consumers, it is possible that once
> >>> a
> >>> consumer
> >>> acquires the "add" message, a different consumer may acquire the
> >>> "remove"
> >>> message. This allows both messages to be processed in parallel,
> >>> which could
> >>> result in the "remove" operation being performed before the "add"
> >>> operation.
> >>>
> >>>
> >>> ## Solution
> >>>
> >>> This QIP proposes the following:
> >>>
> >>> 1) provide the ability for a message producer to designate a set
> >>> of
> >>> messages
> >>> as belonging to the same group.
> >>>
> >>> 2) allow the broker to identify messages that belong to the same
> >>> group.
> >>>
> >>> 3) define policies for the broker that control the delivery of
> >>> messages
> >>> belonging to the same group.
> >>>
> >>> For #1:
> >>>
> >>> The sending application would define a message header that would
> >>> contain the
> >>> message's group identifier. The group identifier stored in that
> >>> header field
> >>> would be a string value determined by the application. Messages
> >>> from
> >>> the same
> >>> group would have the same group identifier value.
> >>
> >> The broker will likely have to keep state associated with a group.
> >> How
> >> does the
> >> broker know when a group ends so it can reclaim those resources?
> >>
> >
> >
> > In this case, it probably would be sufficient to maintain a counter
> > in the group's state. I'm assuming that we'll need a group lookup on
> > message arrival to find that state (or create it on the first
> > received message). Increment the counter when a message in that
> > group is enqueued, decrement when the message is dequeued. If
> > decremented to zero, delete the group. Performance impact of such an
> > approach is TBD, of course - probably need to pool the state objects
> > to limit memory trashing, etc....
> >
> 
> That doesn't sound right. The first N messages of a group could be
> dequeued
> before the N+1 message is enqueued, which would put your counter to 0
> before the
> entire group is processed. I think we need an explicit end of group
> marker on
> the last message in the group to allow the user to close a group in a
> sceanrio
> with lots of dynamic group ids. You could ignore this in cases where
> there is a
> small fixed set of groups that are used over a long time.
> 


Ah, yes - sorry.   The lifetime of the group's state depends on the type of "Policy" that is being used (see below).  For the "Sequenced Consumers" policy - which is what I was thinking of in my last reply - the broker doesn't need to maintain state across all messages of the group.  With that policy, the state can be dropped once there are no more messages for that group present in the broker.  

For the other proposed policy - Exclusive Consumer - then the group state would be associated with a particular Consumer instance, and remain present as long as the Consumer exists.  So in that case, yes - there is the potential that an unbounded number of group states could exist without some kind of reclaim strategy.   And end-of-group marker could be used, but that wouldn't prevent a DOS by a misbehaving app that creates endless groups w/o setting the end marker.  Perhaps a hard max-groups-per-Consumer limit, or a TTL for idle groups?



> >
> >
> >>> For #2:
> >>>
> >>> The key for the header that contains the group identifier would be
> >>> provided to
> >>> the broker via configuration.
> >>
> >> A fixed name like 'qpid.group' seems less likely to confuse.
> >> Perhaps
> >> have id
> >> default to 'qpid.group' but be cofigurable to another name in the
> >> (unlikely?)
> >> event that it is required.
> >>
> >
> > That's a good idea. The current AMQP 1.0 draft defines a message
> > property header called "group-id", perhaps we should use
> > "qpid.group-id" as the default instead of "qpid.group"?
> 
> Should we be using the AMQP group-id? What's it for?
> 


It appears to be for the exact same purpose - identify the group a message belongs to.  Set by the application and preserved to destination.  But I don't see any specific use cases proposed in the spec.


> 
> >>>    From the broker's perspective, the number of different group id
> >>>   values would be
> >>> unlimited. And the value of group identifiers would not be
> >>> provided
> >>> ahead of
> >>> time by configuration: the broker must learn them at runtime.
> >>>
> >>> For #3:
> >>>
> >>> This QIP defines two message group policies. Additional policies
> >>> may
> >>> be
> >>> defined in the future.
> >>>
> >>> Policy 1: Exclusive Consumer
> >>>
> >>> With this policy, the broker would guarantee that all messages in
> >>> a
> >>> group
> >>> would be delivered to the same client.
> >>>
> >>> This policy would be configured on a per-queue basis.
> 
> We could configure policy on a per-group basis with the value of the
> "qpid.group" property. E.g. qpid.group=exclusive.
> That's more flexible and puts control of the policy in the hands of
> the
> producer, rather than requiring an admin convention that associates a
> queue name
> with a policy.
> 

I'm not sure I completely understand: are you proposing the producer sets both the group identifier and a policy type identifier on a per-message basis?


> >>>  When the first
> >>> message
> >>> of a new message group becomes available for delivery, the broker
> >>> will
> >>> associate that group with the next available consumer. The broker
> >>> would then
> >>> guarantee that all messages from that group are delivered to that
> >>> consumer
> >>> only.
> >>>
> >>> The broker will maintain the group/client association for the
> >>> lifetime of the
> >>> client. Should the client die or cancel its subscription, any
> >>> unacknowledged
> >>> messages in the group will be assigned to a different client
> >>> (preserving
> >>> message order). Group/client associations are not maintained
> >>> across
> >>> broker
> >>> restart. These associations must be replicated in a clustered
> >>> broker.
> >>>
> >>>
> >>> Policy #2: Sequenced Consumers
> >>>
> >>> With this policy, the broker would guarantee that the order in
> >>> which
> >>> messages
> >>> in a group are processed by consumers is the same order in which
> >>> the
> >>> messages
> >>> where enqueued. This guarantee would be upheld even if the
> >>> messages
> >>> of the
> >>> group are processed by different consumers. No two messages from
> >>> the
> >>> same
> >>> group would be processed in parallel by different consumers.
> >>>
> >>> Specifically, for any given group, the broker allows only the
> >>> first
> >>> N messages
> >>> in the group to be available for delivery to a particular
> >>> consumer.
> >>> The value
> >>> of N would be determined by the selected consumer's configured
> >>> prefetch
> >>> capacity. The broker blocks access to the remaining messages in
> >>> that
> >>> group by
> >>> any other consumer. Once the selected consumer has acknowledged
> >>> that
> >>> first set
> >>> of delivered messages, the broker allows the next messages in the
> >>> group to be
> >>> available for delivery. The next set of messages may be delivered
> >>> to
> >>> a
> >>> different consumer.
> >>>
> >>> This policy would be configured on a per-queue basis.
> >>> Configuration
> >>> would
> >>> include designating the key of the application header that
> >>> specifies
> >>> the group
> >>> id.
> >>>
> >>> Note will that, with this policy, the consuming application has
> >>> to:
> >>>
> >>> 1. ensure that it has completely processed the data in a received
> >>> message
> >>> before accepting that message, as described in Section 2.6.2.
> >>> Transfer of
> >>> Responsibility, of the AMQP-0.10 specification.
> >>>
> >>> 2. ensure that messages are not selectively acknowledged or
> >>> released
> >>> - order
> >>> must be preserved in both cases.
> >>
> >> What happens if the consuming application fails to respect these
> >> requirements?
> >>
> >
> > Hmmm... good question. My initial thoughts are that this would be
> > considered an application error, since preserving order is important
> > for this feature. Perhaps the broker should respond with an error
> > exception?
> >
> >
> >>
> >>> Note well that in the case of either of these proposed policies,
> >>> distinct
> >>> message groups would not block each other from delivery. For
> >>> example, assume a
> >>> queue contains messages from two different message groups - say
> >>> group "A" and
> >>> group "B" - and they are enqueued such that "A"'s messages are in
> >>> front of
> >>> "B". If the first message of group "A" is in the process of being
> >>> consumed by a
> >>> client, then the remaining "A" messages are blocked, but the
> >>> messages of the
> >>> "B" group are available for consumption - even though it is
> >>> "behind"
> >>> group "A"
> >>> in the queue.
> >>>
> >>>
> >>> ## Rationale
> >>>
> >>>
> >>> The solution described above allows an application to designate a
> >>> set of
> >>> related messages, and provides policies for controlling the
> >>> consumption of the
> >>> message group.
> >>>
> >>>    * Goal: allow dynamic values for the group identifiers (no
> >>>    preconfiguration of identifiers necessary)
> >>>    * Goal: the number of message groups "in flight" should only be
> >>>    limited by the available resources (no need to configure a hard
> >>>    limit).
> >>>    * Goal: manageability: visibility into the message groups
> >>>    currently on a given queue
> >>>    * Goal: manageability: purge, move, reroute messages at the
> >>>    group
> >>>    level.
> >>>
> >>> ## Implementation Notes
> >>>
> >>>    * Queues: support configuration of the group identifier header
> >>>    key.
> >>>    * Messages: provide access to group identifier.
> >>>    * Queues: identify head message of next available message
> >>>    group.
> >>>    * Queues: block the trailing messages in a given message group
> >>>    from being consumed.
> >>>    * Consumers: track the message group of the currently acquired
> >>>    message(s).
> >>>    * Clustering: ensure state is replicated within the cluster as
> >>>    needed.
> >>>
> >>> ## Consequences
> >>>
> >>>    * __Development:__ No changes to the development process.
> >>>    * __Release:__ No changes to the release process.
> >>>    * __Documentation:__ User documentation will be needed to
> >>>    explain
> >>>    the feature, and the steps to configure and manage the new
> >>>    feature.
> >>>    * __Configuration:__ Yes: per-queue group identifier header key
> >>>    is
> >>>    configurable. Queue state with regard to in-flight message
> >>>    groups
> >>>    needs to be visible. Additional methods to purge/move/reroute a
> >>>    message group.
> >>>    * __Compatibility:__ Unlikely - new feature that would need to
> >>>    be
> >>>    enabled manually. Applications wishing to use the feature would
> >>>    need to implement a message grouping policy, and ensure the
> >>>    processing of received message data is compliant with the
> >>>    desired
> >>>    policy.
> >>>
> >>> ## References
> >>>
> >>>    * None.
> >>>
> >>> ## Contributor-in-Charge
> >>>
> >>> Kenneth Giusti,<kg...@redhat.com>
> >>>
> >>> ## Contributors
> >>>
> >>>    * Ted Ross,<tr...@redhat.com>
> >>>
> >>> ## Version
> >>>
> >>> 0.2
> >>>
> >>>
> >>
> >> ---------------------------------------------------------------------
> >> Apache Qpid - AMQP Messaging Implementation
> >> Project: http://qpid.apache.org
> >> Use/Interact: mailto:dev-subscribe@qpid.apache.org
> >
> > ---------------------------------------------------------------------
> > Apache Qpid - AMQP Messaging Implementation
> > Project: http://qpid.apache.org
> > Use/Interact: mailto:dev-subscribe@qpid.apache.org
> >
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 07/20/2011 11:57 AM, Ken Giusti wrote:
>
> Alan - thanks for the input, replies inline....
>
> -K
>
> ----- Original Message -----
>> On 07/01/2011 07:04 AM, Ken Giusti wrote:
>>> Folks,
>>>
>>> Here's a second draft of the proposal. I've tried to incorporate the
>>> feedback provided during the last week. And I have tried to limit
>>> the proposed feature set a bit more.
>>>
>>> Opinions welcome, thanks -
>>>
>>> -K
>>>
>>>
>>> # Message Groups
>>>
>>> ## Status
>>>
>>> Draft
>>>
>>> ## Summary
>>>
>>> This document describes a new feature that would allow an
>>> application to
>>> classify a set of related messages as belonging to a group. This
>>> document also
>>> describes two policies that the broker could apply when delivering a
>>> message
>>> group to one or more consumers.
>>>
>>>
>>> ## Problem
>>>
>>> It would be useful to give an application the ability to classify a
>>> set of
>>> messages as belonging to a single unit of work. Furthermore, if the
>>> broker can
>>> identify messages belonging to the same unit of work, it can enforce
>>> policies
>>> that control how that unit of work can be consumed.
>>>
>>> For example, it may be desirable to guarantee that a particular set
>>> of
>>> messages are consumed by the same client, even in the case where
>>> there are
>>> multiple clients consuming from the same queue.
>>>
>>> In a different scenario, it may be permissible for different clients
>>> to consume
>>> messages from the same group, as long as it can be guaranteed that
>>> the
>>> individual messages are not processed in parallel. In other words,
>>> the broker
>>> would ensure that messages are processed by consumers in the same
>>> order in
>>> which they were enqueued.
>>>
>>> For example, assume we have a shopping application that manages
>>> items in a
>>> virtual shopping cart. A user may add an item to their shopping
>>> cart, then
>>> change their mind and remove it. If the application sends an "add"
>>> message to
>>> the broker, immediately followed by a "remove" message, they will be
>>> queued in
>>> the proper order - "add", then "remove".
>>>
>>> However, if there are multiple consumers, it is possible that once a
>>> consumer
>>> acquires the "add" message, a different consumer may acquire the
>>> "remove"
>>> message. This allows both messages to be processed in parallel,
>>> which could
>>> result in the "remove" operation being performed before the "add"
>>> operation.
>>>
>>>
>>> ## Solution
>>>
>>> This QIP proposes the following:
>>>
>>> 1) provide the ability for a message producer to designate a set of
>>> messages
>>> as belonging to the same group.
>>>
>>> 2) allow the broker to identify messages that belong to the same
>>> group.
>>>
>>> 3) define policies for the broker that control the delivery of
>>> messages
>>> belonging to the same group.
>>>
>>> For #1:
>>>
>>> The sending application would define a message header that would
>>> contain the
>>> message's group identifier. The group identifier stored in that
>>> header field
>>> would be a string value determined by the application. Messages from
>>> the same
>>> group would have the same group identifier value.
>>
>> The broker will likely have to keep state associated with a group. How
>> does the
>> broker know when a group ends so it can reclaim those resources?
>>
>
>
> In this case, it probably would be sufficient to maintain a counter in the group's state.  I'm assuming that we'll need a group lookup on message arrival to find that state (or create it on the first received message).   Increment the counter when a message in that group is enqueued, decrement when the message is dequeued.  If decremented to zero, delete the group.   Performance impact of such an approach is TBD, of course - probably need to pool the state objects to limit memory trashing, etc....
>

That doesn't sound right. The first N messages of a group could be dequeued 
before the N+1 message is enqueued, which would put your counter to 0 before the 
entire group is processed. I think we need an explicit end of group marker on 
the last message in the group to allow the user to close a group in a sceanrio 
with lots of dynamic group ids. You could ignore this in cases where there is a 
small fixed set of groups that are used over a long time.

>
>
>>> For #2:
>>>
>>> The key for the header that contains the group identifier would be
>>> provided to
>>> the broker via configuration.
>>
>> A fixed name like 'qpid.group' seems less likely to confuse. Perhaps
>> have id
>> default to 'qpid.group' but be cofigurable to another name in the
>> (unlikely?)
>> event that it is required.
>>
>
> That's a good idea.  The current AMQP 1.0 draft defines a message property header called "group-id", perhaps we should use "qpid.group-id" as the default instead of "qpid.group"?

Should we be using the AMQP group-id? What's it for?


>>>    From the broker's perspective, the number of different group id
>>>   values would be
>>> unlimited. And the value of group identifiers would not be provided
>>> ahead of
>>> time by configuration: the broker must learn them at runtime.
>>>
>>> For #3:
>>>
>>> This QIP defines two message group policies. Additional policies may
>>> be
>>> defined in the future.
>>>
>>> Policy 1: Exclusive Consumer
>>>
>>> With this policy, the broker would guarantee that all messages in a
>>> group
>>> would be delivered to the same client.
>>>
>>> This policy would be configured on a per-queue basis.

We could configure policy on a per-group basis with the value of the 
"qpid.group" property. E.g. qpid.group=exclusive.
That's more flexible and puts control of the policy in the hands of the 
producer, rather than requiring an admin convention that associates a queue name 
with a policy.

>>>  When the first
>>> message
>>> of a new message group becomes available for delivery, the broker
>>> will
>>> associate that group with the next available consumer. The broker
>>> would then
>>> guarantee that all messages from that group are delivered to that
>>> consumer
>>> only.
>>>
>>> The broker will maintain the group/client association for the
>>> lifetime of the
>>> client. Should the client die or cancel its subscription, any
>>> unacknowledged
>>> messages in the group will be assigned to a different client
>>> (preserving
>>> message order). Group/client associations are not maintained across
>>> broker
>>> restart. These associations must be replicated in a clustered
>>> broker.
>>>
>>>
>>> Policy #2: Sequenced Consumers
>>>
>>> With this policy, the broker would guarantee that the order in which
>>> messages
>>> in a group are processed by consumers is the same order in which the
>>> messages
>>> where enqueued. This guarantee would be upheld even if the messages
>>> of the
>>> group are processed by different consumers. No two messages from the
>>> same
>>> group would be processed in parallel by different consumers.
>>>
>>> Specifically, for any given group, the broker allows only the first
>>> N messages
>>> in the group to be available for delivery to a particular consumer.
>>> The value
>>> of N would be determined by the selected consumer's configured
>>> prefetch
>>> capacity. The broker blocks access to the remaining messages in that
>>> group by
>>> any other consumer. Once the selected consumer has acknowledged that
>>> first set
>>> of delivered messages, the broker allows the next messages in the
>>> group to be
>>> available for delivery. The next set of messages may be delivered to
>>> a
>>> different consumer.
>>>
>>> This policy would be configured on a per-queue basis. Configuration
>>> would
>>> include designating the key of the application header that specifies
>>> the group
>>> id.
>>>
>>> Note will that, with this policy, the consuming application has to:
>>>
>>> 1. ensure that it has completely processed the data in a received
>>> message
>>> before accepting that message, as described in Section 2.6.2.
>>> Transfer of
>>> Responsibility, of the AMQP-0.10 specification.
>>>
>>> 2. ensure that messages are not selectively acknowledged or released
>>> - order
>>> must be preserved in both cases.
>>
>> What happens if the consuming application fails to respect these
>> requirements?
>>
>
> Hmmm... good question.  My initial thoughts are that this would be considered an application error, since preserving order is important for this feature.  Perhaps the broker should respond with an error exception?
>
>
>>
>>> Note well that in the case of either of these proposed policies,
>>> distinct
>>> message groups would not block each other from delivery. For
>>> example, assume a
>>> queue contains messages from two different message groups - say
>>> group "A" and
>>> group "B" - and they are enqueued such that "A"'s messages are in
>>> front of
>>> "B". If the first message of group "A" is in the process of being
>>> consumed by a
>>> client, then the remaining "A" messages are blocked, but the
>>> messages of the
>>> "B" group are available for consumption - even though it is "behind"
>>> group "A"
>>> in the queue.
>>>
>>>
>>> ## Rationale
>>>
>>>
>>> The solution described above allows an application to designate a
>>> set of
>>> related messages, and provides policies for controlling the
>>> consumption of the
>>> message group.
>>>
>>>    * Goal: allow dynamic values for the group identifiers (no
>>>    preconfiguration of identifiers necessary)
>>>    * Goal: the number of message groups "in flight" should only be
>>>    limited by the available resources (no need to configure a hard
>>>    limit).
>>>    * Goal: manageability: visibility into the message groups
>>>    currently on a given queue
>>>    * Goal: manageability: purge, move, reroute messages at the group
>>>    level.
>>>
>>> ## Implementation Notes
>>>
>>>    * Queues: support configuration of the group identifier header
>>>    key.
>>>    * Messages: provide access to group identifier.
>>>    * Queues: identify head message of next available message group.
>>>    * Queues: block the trailing messages in a given message group
>>>    from being consumed.
>>>    * Consumers: track the message group of the currently acquired
>>>    message(s).
>>>    * Clustering: ensure state is replicated within the cluster as
>>>    needed.
>>>
>>> ## Consequences
>>>
>>>    * __Development:__ No changes to the development process.
>>>    * __Release:__ No changes to the release process.
>>>    * __Documentation:__ User documentation will be needed to explain
>>>    the feature, and the steps to configure and manage the new
>>>    feature.
>>>    * __Configuration:__ Yes: per-queue group identifier header key is
>>>    configurable. Queue state with regard to in-flight message groups
>>>    needs to be visible. Additional methods to purge/move/reroute a
>>>    message group.
>>>    * __Compatibility:__ Unlikely - new feature that would need to be
>>>    enabled manually. Applications wishing to use the feature would
>>>    need to implement a message grouping policy, and ensure the
>>>    processing of received message data is compliant with the desired
>>>    policy.
>>>
>>> ## References
>>>
>>>    * None.
>>>
>>> ## Contributor-in-Charge
>>>
>>> Kenneth Giusti,<kg...@redhat.com>
>>>
>>> ## Contributors
>>>
>>>    * Ted Ross,<tr...@redhat.com>
>>>
>>> ## Version
>>>
>>> 0.2
>>>
>>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project: http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Gordon Sim <gs...@redhat.com>.
On 07/20/2011 04:57 PM, Ken Giusti wrote:
> ----- Original Message -----
>> On 07/01/2011 07:04 AM, Ken Giusti wrote:
>>> For #2:
>>>
>>> The key for the header that contains the group identifier would be
>>> provided to
>>> the broker via configuration.
>>
>> A fixed name like 'qpid.group' seems less likely to confuse. Perhaps
>> have id
>> default to 'qpid.group' but be cofigurable to another name in the
>> (unlikely?)
>> event that it is required.
>>
>
> That's a good idea.  The current AMQP 1.0 draft defines a message property header called "group-id", perhaps we should use "qpid.group-id" as the default instead of "qpid.group"?

The advantage of a configurable header is that the group can be keyed 
off some application meaningful property and the application need not be 
altered just to introduce a particular arbitrary property.

I actually think that having a default makes it more confusing. You will 
still have to turn grouping on explicitly, right? So why not have the 
mechanism for so doing be the identification of a header from which the 
message's group can be determined? I don't find that confusing.

>>> 2. ensure that messages are not selectively acknowledged or released
>>> - order
>>> must be preserved in both cases.
>>
>> What happens if the consuming application fails to respect these
>> requirements?
>>
>
> Hmmm... good question.  My initial thoughts are that this would be considered an application error, since preserving order is important for this feature.  Perhaps the broker should respond with an error exception?

I think that is an application error. The aim is to allow processing of 
the messages in order. If the application fails to sensibly communicate 
regarding that processing, it is itself responsible for the results.

(I could imagine in the future ways of preventing this or alerting 
applications about it, but I think for now that should be out of scope).

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Ken Giusti <kg...@redhat.com>.
Alan - thanks for the input, replies inline....

-K

----- Original Message -----
> On 07/01/2011 07:04 AM, Ken Giusti wrote:
> > Folks,
> >
> > Here's a second draft of the proposal. I've tried to incorporate the
> > feedback provided during the last week. And I have tried to limit
> > the proposed feature set a bit more.
> >
> > Opinions welcome, thanks -
> >
> > -K
> >
> >
> > # Message Groups
> >
> > ## Status
> >
> > Draft
> >
> > ## Summary
> >
> > This document describes a new feature that would allow an
> > application to
> > classify a set of related messages as belonging to a group. This
> > document also
> > describes two policies that the broker could apply when delivering a
> > message
> > group to one or more consumers.
> >
> >
> > ## Problem
> >
> > It would be useful to give an application the ability to classify a
> > set of
> > messages as belonging to a single unit of work. Furthermore, if the
> > broker can
> > identify messages belonging to the same unit of work, it can enforce
> > policies
> > that control how that unit of work can be consumed.
> >
> > For example, it may be desirable to guarantee that a particular set
> > of
> > messages are consumed by the same client, even in the case where
> > there are
> > multiple clients consuming from the same queue.
> >
> > In a different scenario, it may be permissible for different clients
> > to consume
> > messages from the same group, as long as it can be guaranteed that
> > the
> > individual messages are not processed in parallel. In other words,
> > the broker
> > would ensure that messages are processed by consumers in the same
> > order in
> > which they were enqueued.
> >
> > For example, assume we have a shopping application that manages
> > items in a
> > virtual shopping cart. A user may add an item to their shopping
> > cart, then
> > change their mind and remove it. If the application sends an "add"
> > message to
> > the broker, immediately followed by a "remove" message, they will be
> > queued in
> > the proper order - "add", then "remove".
> >
> > However, if there are multiple consumers, it is possible that once a
> > consumer
> > acquires the "add" message, a different consumer may acquire the
> > "remove"
> > message. This allows both messages to be processed in parallel,
> > which could
> > result in the "remove" operation being performed before the "add"
> > operation.
> >
> >
> > ## Solution
> >
> > This QIP proposes the following:
> >
> > 1) provide the ability for a message producer to designate a set of
> > messages
> > as belonging to the same group.
> >
> > 2) allow the broker to identify messages that belong to the same
> > group.
> >
> > 3) define policies for the broker that control the delivery of
> > messages
> > belonging to the same group.
> >
> > For #1:
> >
> > The sending application would define a message header that would
> > contain the
> > message's group identifier. The group identifier stored in that
> > header field
> > would be a string value determined by the application. Messages from
> > the same
> > group would have the same group identifier value.
> 
> The broker will likely have to keep state associated with a group. How
> does the
> broker know when a group ends so it can reclaim those resources?
> 


In this case, it probably would be sufficient to maintain a counter in the group's state.  I'm assuming that we'll need a group lookup on message arrival to find that state (or create it on the first received message).   Increment the counter when a message in that group is enqueued, decrement when the message is dequeued.  If decremented to zero, delete the group.   Performance impact of such an approach is TBD, of course - probably need to pool the state objects to limit memory trashing, etc....



> > For #2:
> >
> > The key for the header that contains the group identifier would be
> > provided to
> > the broker via configuration.
> 
> A fixed name like 'qpid.group' seems less likely to confuse. Perhaps
> have id
> default to 'qpid.group' but be cofigurable to another name in the
> (unlikely?)
> event that it is required.
> 

That's a good idea.  The current AMQP 1.0 draft defines a message property header called "group-id", perhaps we should use "qpid.group-id" as the default instead of "qpid.group"?


> >  From the broker's perspective, the number of different group id
> >  values would be
> > unlimited. And the value of group identifiers would not be provided
> > ahead of
> > time by configuration: the broker must learn them at runtime.
> >
> > For #3:
> >
> > This QIP defines two message group policies. Additional policies may
> > be
> > defined in the future.
> >
> > Policy 1: Exclusive Consumer
> >
> > With this policy, the broker would guarantee that all messages in a
> > group
> > would be delivered to the same client.
> >
> > This policy would be configured on a per-queue basis. When the first
> > message
> > of a new message group becomes available for delivery, the broker
> > will
> > associate that group with the next available consumer. The broker
> > would then
> > guarantee that all messages from that group are delivered to that
> > consumer
> > only.
> >
> > The broker will maintain the group/client association for the
> > lifetime of the
> > client. Should the client die or cancel its subscription, any
> > unacknowledged
> > messages in the group will be assigned to a different client
> > (preserving
> > message order). Group/client associations are not maintained across
> > broker
> > restart. These associations must be replicated in a clustered
> > broker.
> >
> >
> > Policy #2: Sequenced Consumers
> >
> > With this policy, the broker would guarantee that the order in which
> > messages
> > in a group are processed by consumers is the same order in which the
> > messages
> > where enqueued. This guarantee would be upheld even if the messages
> > of the
> > group are processed by different consumers. No two messages from the
> > same
> > group would be processed in parallel by different consumers.
> >
> > Specifically, for any given group, the broker allows only the first
> > N messages
> > in the group to be available for delivery to a particular consumer.
> > The value
> > of N would be determined by the selected consumer's configured
> > prefetch
> > capacity. The broker blocks access to the remaining messages in that
> > group by
> > any other consumer. Once the selected consumer has acknowledged that
> > first set
> > of delivered messages, the broker allows the next messages in the
> > group to be
> > available for delivery. The next set of messages may be delivered to
> > a
> > different consumer.
> >
> > This policy would be configured on a per-queue basis. Configuration
> > would
> > include designating the key of the application header that specifies
> > the group
> > id.
> >
> > Note will that, with this policy, the consuming application has to:
> >
> > 1. ensure that it has completely processed the data in a received
> > message
> > before accepting that message, as described in Section 2.6.2.
> > Transfer of
> > Responsibility, of the AMQP-0.10 specification.
> >
> > 2. ensure that messages are not selectively acknowledged or released
> > - order
> > must be preserved in both cases.
> 
> What happens if the consuming application fails to respect these
> requirements?
> 

Hmmm... good question.  My initial thoughts are that this would be considered an application error, since preserving order is important for this feature.  Perhaps the broker should respond with an error exception?


> 
> > Note well that in the case of either of these proposed policies,
> > distinct
> > message groups would not block each other from delivery. For
> > example, assume a
> > queue contains messages from two different message groups - say
> > group "A" and
> > group "B" - and they are enqueued such that "A"'s messages are in
> > front of
> > "B". If the first message of group "A" is in the process of being
> > consumed by a
> > client, then the remaining "A" messages are blocked, but the
> > messages of the
> > "B" group are available for consumption - even though it is "behind"
> > group "A"
> > in the queue.
> >
> >
> > ## Rationale
> >
> >
> > The solution described above allows an application to designate a
> > set of
> > related messages, and provides policies for controlling the
> > consumption of the
> > message group.
> >
> >   * Goal: allow dynamic values for the group identifiers (no
> >   preconfiguration of identifiers necessary)
> >   * Goal: the number of message groups "in flight" should only be
> >   limited by the available resources (no need to configure a hard
> >   limit).
> >   * Goal: manageability: visibility into the message groups
> >   currently on a given queue
> >   * Goal: manageability: purge, move, reroute messages at the group
> >   level.
> >
> > ## Implementation Notes
> >
> >   * Queues: support configuration of the group identifier header
> >   key.
> >   * Messages: provide access to group identifier.
> >   * Queues: identify head message of next available message group.
> >   * Queues: block the trailing messages in a given message group
> >   from being consumed.
> >   * Consumers: track the message group of the currently acquired
> >   message(s).
> >   * Clustering: ensure state is replicated within the cluster as
> >   needed.
> >
> > ## Consequences
> >
> >   * __Development:__ No changes to the development process.
> >   * __Release:__ No changes to the release process.
> >   * __Documentation:__ User documentation will be needed to explain
> >   the feature, and the steps to configure and manage the new
> >   feature.
> >   * __Configuration:__ Yes: per-queue group identifier header key is
> >   configurable. Queue state with regard to in-flight message groups
> >   needs to be visible. Additional methods to purge/move/reroute a
> >   message group.
> >   * __Compatibility:__ Unlikely - new feature that would need to be
> >   enabled manually. Applications wishing to use the feature would
> >   need to implement a message grouping policy, and ensure the
> >   processing of received message data is compliant with the desired
> >   policy.
> >
> > ## References
> >
> >   * None.
> >
> > ## Contributor-in-Charge
> >
> > Kenneth Giusti,<kg...@redhat.com>
> >
> > ## Contributors
> >
> >   * Ted Ross,<tr...@redhat.com>
> >
> > ## Version
> >
> > 0.2
> >
> >
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Proposed QIP: Support for Message Grouping in the broker.

Posted by Alan Conway <ac...@redhat.com>.
On 07/01/2011 07:04 AM, Ken Giusti wrote:
> Folks,
>
> Here's a second draft of the proposal.  I've tried to incorporate the feedback provided during the last week.  And I have tried to limit the proposed feature set a bit more.
>
> Opinions welcome, thanks -
>
> -K
>
>
> # Message Groups
>
> ## Status
>
> Draft
>
> ## Summary
>
> This document describes a new feature that would allow an application to
> classify a set of related messages as belonging to a group. This document also
> describes two policies that the broker could apply when delivering a message
> group to one or more consumers.
>
>
> ## Problem
>
> It would be useful to give an application the ability to classify a set of
> messages as belonging to a single unit of work.  Furthermore, if the broker can
> identify messages belonging to the same unit of work, it can enforce policies
> that control how that unit of work can be consumed.
>
> For example, it may be desirable to guarantee that a particular set of
> messages are consumed by the same client, even in the case where there are
> multiple clients consuming from the same queue.
>
> In a different scenario, it may be permissible for different clients to consume
> messages from the same group, as long as it can be guaranteed that the
> individual messages are not processed in parallel.  In other words, the broker
> would ensure that messages are processed by consumers in the same order in
> which they were enqueued.
>
> For example, assume we have a shopping application that manages items in a
> virtual shopping cart.  A user may add an item to their shopping cart, then
> change their mind and remove it.  If the application sends an "add" message to
> the broker, immediately followed by a "remove" message, they will be queued in
> the proper order - "add", then "remove".
>
> However, if there are multiple consumers, it is possible that once a consumer
> acquires the "add" message, a different consumer may acquire the "remove"
> message.  This allows both messages to be processed in parallel, which could
> result in the "remove" operation being performed before the "add" operation.
>
>
> ## Solution
>
> This QIP proposes the following:
>
> 1) provide the ability for a message producer to designate a set of messages
> as belonging to the same group.
>
> 2) allow the broker to identify messages that belong to the same group.
>
> 3) define policies for the broker that control the delivery of messages
> belonging to the same group.
>
> For #1:
>
> The sending application would define a message header that would contain the
> message's group identifier.  The group identifier stored in that header field
> would be a string value determined by the application.  Messages from the same
> group would have the same group identifier value.

The broker will likely have to keep state associated with a group. How does the 
broker know when a group ends so it can reclaim those resources?

> For #2:
>
> The key for the header that contains the group identifier would be provided to
> the broker via configuration.

A fixed name like 'qpid.group' seems less likely to confuse. Perhaps have id 
default to 'qpid.group' but be cofigurable to another name in the (unlikely?) 
event that it is required.

>  From the broker's perspective, the number of different group id values would be
> unlimited.  And the value of group identifiers would not be provided ahead of
> time by configuration: the broker must learn them at runtime.
>
> For #3:
>
> This QIP defines two message group policies.  Additional policies may be
> defined in the future.
>
> Policy 1: Exclusive Consumer
>
> With this policy, the broker would guarantee that all messages in a group
> would be delivered to the same client.
>
> This policy would be configured on a per-queue basis.  When the first message
> of a new message group becomes available for delivery, the broker will
> associate that group with the next available consumer.  The broker would then
> guarantee that all messages from that group are delivered to that consumer
> only.
>
> The broker will maintain the group/client association for the lifetime of the
> client.  Should the client die or cancel its subscription, any unacknowledged
> messages in the group will be assigned to a different client (preserving
> message order).  Group/client associations are not maintained across broker
> restart.  These associations must be replicated in a clustered broker.
>
>
> Policy #2: Sequenced Consumers
>
> With this policy, the broker would guarantee that the order in which messages
> in a group are processed by consumers is the same order in which the messages
> where enqueued.  This guarantee would be upheld even if the messages of the
> group are processed by different consumers.  No two messages from the same
> group would be processed in parallel by different consumers.
>
> Specifically, for any given group, the broker allows only the first N messages
> in the group to be available for delivery to a particular consumer.  The value
> of N would be determined by the selected consumer's configured prefetch
> capacity.  The broker blocks access to the remaining messages in that group by
> any other consumer.  Once the selected consumer has acknowledged that first set
> of delivered messages, the broker allows the next messages in the group to be
> available for delivery.  The next set of messages may be delivered to a
> different consumer.
>
> This policy would be configured on a per-queue basis.  Configuration would
> include designating the key of the application header that specifies the group
> id.
>
> Note will that, with this policy, the consuming application has to:
>
> 1. ensure that it has completely processed the data in a received message
> before accepting that message, as described in Section 2.6.2. Transfer of
> Responsibility, of the AMQP-0.10 specification.
>
> 2. ensure that messages are not selectively acknowledged or released - order
> must be preserved in both cases.

What happens if the consuming application fails to respect these requirements?


> Note well that in the case of either of these proposed policies, distinct
> message groups would not block each other from delivery.  For example, assume a
> queue contains messages from two different message groups - say group "A" and
> group "B" - and they are enqueued such that "A"'s messages are in front of
> "B". If the first message of group "A" is in the process of being consumed by a
> client, then the remaining "A" messages are blocked, but the messages of the
> "B" group are available for consumption - even though it is "behind" group "A"
> in the queue.
>
>
> ## Rationale
>
>
> The solution described above allows an application to designate a set of
> related messages, and provides policies for controlling the consumption of the
> message group.
>
>   * Goal: allow dynamic values for the group identifiers (no preconfiguration of identifiers necessary)
>   * Goal: the number of message groups "in flight" should only be limited by the available resources (no need to configure a hard limit).
>   * Goal: manageability: visibility into the message groups currently on a given queue
>   * Goal: manageability: purge, move, reroute messages at the group level.
>
> ## Implementation Notes
>
>   * Queues: support configuration of the group identifier header key.
>   * Messages: provide access to group identifier.
>   * Queues: identify head message of next available message group.
>   * Queues: block the trailing messages in a given message group from being consumed.
>   * Consumers: track the message group of the currently acquired message(s).
>   * Clustering: ensure state is replicated within the cluster as needed.
>
> ## Consequences
>
>   * __Development:__ No changes to the development process.
>   * __Release:__ No changes to the release process.
>   * __Documentation:__ User documentation will be needed to explain the feature, and the steps to configure and manage the new feature.
>   * __Configuration:__ Yes: per-queue group identifier header key is configurable.  Queue state with regard to in-flight message groups needs to be visible.  Additional methods to purge/move/reroute a message group.
>   * __Compatibility:__ Unlikely - new feature that would need to be enabled manually.  Applications wishing to use the feature would need to implement a message grouping policy, and ensure the processing of received message data is compliant with the desired policy.
>
> ## References
>
>   * None.
>
> ## Contributor-in-Charge
>
> Kenneth Giusti,<kg...@redhat.com>
>
> ## Contributors
>
>   * Ted Ross,<tr...@redhat.com>
>
> ## Version
>
> 0.2
>
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org