You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by bmadigan <bm...@orbitz.com> on 2006/07/05 21:40:08 UTC

Failover topic subscribers

Is there a way to group Topic subscribers? What I want is to have clusters of
consumers with the network of brokers handling load balancing and failover.
Only one consumer per cluster would consume a message. Message Groups does
not seem to be implemented for this purpose (unless I am missing something). 
I think I can create a BrokerFilter to do this. Is there a better way to do
this?
-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5188378
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Failover topic subscribers

Posted by maquanjun <ma...@gmail.com>.
It seems you need a queue,while not a topic ,I think

-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5193027
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Failover topic subscribers

Posted by James Strachan <ja...@gmail.com>.
On 7/6/06, bmadigan <bm...@orbitz.com> wrote:
>
> From Hiram's comments ( http://issues.apache.org/activemq/browse/AMQ-452 )
> "Basically this means that we need to keep a per subscription ack table so
> that concurrent consumer can ack random messages in the subscription list."
>
> So acknowledge() in BrokerFilter is invoked any time a subscriber sends an
> ack. I'm guessing I can spoof an acknowledge to the other subscribers in the
> 'group', making the group behave like a concurrent consumer. But this would
> only work if message delivery was synchronous for a subscriber group,
> correct? I have to look at how delivery happens for topic subscribers.

As I mentioned a couple of messages ago, I think using a physical
queue per logical topic subscription is gonna be the easiest as it
does exactly whats required (load balancing of messages across
consumers for the same logical subscription and dealing with
acknowledges coming back in random order). Each logical topic
subscription needs to acknowledge individual messages and the messages
need to be load balanced across each consumer for a given logical
subscription so messages are only processed once per logical
subscription etc.

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Failover topic subscribers

Posted by bmadigan <bm...@orbitz.com>.
>From Hiram's comments ( http://issues.apache.org/activemq/browse/AMQ-452 )
"Basically this means that we need to keep a per subscription ack table so
that concurrent consumer can ack random messages in the subscription list."

So acknowledge() in BrokerFilter is invoked any time a subscriber sends an
ack. I'm guessing I can spoof an acknowledge to the other subscribers in the
'group', making the group behave like a concurrent consumer. But this would
only work if message delivery was synchronous for a subscriber group,
correct? I have to look at how delivery happens for topic subscribers.
-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5203662
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Failover topic subscribers

Posted by bmadigan <bm...@orbitz.com>.
Very cool, thanks! I am getting the subversion client installed so I can get
the latest source. 
-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5204881
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Failover topic subscribers

Posted by bmadigan <bm...@orbitz.com>.
This is almost working, there are a few things I need to fix:
- Need to figure out how to add the new Broker to the factory without using
the plugin loader
- It may not be a problem, but I'm synchronizing  on next when I create the
queues for the virtual groups in addConsumer().  This could be finer grained
I think.
- I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
consumer to a virtual queue. This is probably incorrect, not sure if there
is a better way. 
- The virtual queues can't provide subscription recovery. Not sure how to
handle that.

I created a BrokerFilter subclass which overrides addConsumer() and send():

 public Subscription addConsumer(ConnectionContext cc,
                                    ConsumerInfo ci) throws Exception {
        synchronized(next){
            String name  = ci.getDestination().getPhysicalName();
            if(name.startsWith(VIRTUAL)){
                Set destinations = getDestinations(
                        new ActiveMQQueue(name));
                if(destinations.size()==0){//create a new virtual queue
                    ActiveMQQueue queue = new ActiveMQQueue(
                            name+"?consumer.exclusive=true");
                        next.addDestination(cc,queue);
                    ci.setDestination(queue);
                }else{ //queue exists, add the consumer
                    ActiveMQQueue queue = (ActiveMQQueue)
                            destinations.iterator().next();
                    ci.setDestination(queue);
                }
            }
        }
        return next.addConsumer(cc, ci);
    }

    public void send(ConnectionContext ctx,
                     Message message) throws Exception {
        String topic = message.getDestination().getPhysicalName();
        Iterator destinations = getDestinations(
                new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
        while(destinations.hasNext()){
            Destination dest = (Destination) destinations.next();
            dest.send(ctx, message);
        }
        next.send(ctx, message);
    }

Except for the subscription recovery part, this seems to work. 

-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5317425
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Failover topic subscribers

Posted by James Strachan <ja...@gmail.com>.
On 7/6/06, James Strachan <ja...@gmail.com> wrote:
> On 7/6/06, bmadigan <bm...@orbitz.com> wrote:
> >
> > I am definately taking a stab at this already, so I'll let you know what I
> > come up with.
>
> Great! :)
>
> BTW am just about to commit an implementation of getDestinations() on
> BrokerFilter for you to do the wildcard lookup of all destinations
> matching ${prefix}.*.${topic} which will be handy for finding the
> 'logical topic subscribers' in a simple map-ish lookup. It returns a
> Set so you just need to iterate through it and do a send of a message
> to the queue for each result.

Its in trunk now, so you should be able to do this from inside a BrokerFilter...

String prefix = "Virtual.";
String topic = "FOO.BAR";

Set physicalQueues = getDestinations(new ActiveMQQueue(prefix + ".*." + topic));
foreach (physicalQueues) {
  send message to it...

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Failover topic subscribers

Posted by James Strachan <ja...@gmail.com>.
On 7/6/06, bmadigan <bm...@orbitz.com> wrote:
>
> I am definately taking a stab at this already, so I'll let you know what I
> come up with.

Great! :)

BTW am just about to commit an implementation of getDestinations() on
BrokerFilter for you to do the wildcard lookup of all destinations
matching ${prefix}.*.${topic} which will be handy for finding the
'logical topic subscribers' in a simple map-ish lookup. It returns a
Set so you just need to iterate through it and do a send of a message
to the queue for each result.


>  Creating the duplicate messages is not too much of a concern
> right away, especially if the messages are small. For us that will mean 2
> messages per 'group' instead of 1, so not such a big deal. I don't think
> anyone is planning to have consumer clusters of more than 2 nodes.

Cool.

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Failover topic subscribers

Posted by bmadigan <bm...@orbitz.com>.
I am definately taking a stab at this already, so I'll let you know what I
come up with.  Creating the duplicate messages is not too much of a concern
right away, especially if the messages are small. For us that will mean 2
messages per 'group' instead of 1, so not such a big deal. I don't think
anyone is planning to have consumer clusters of more than 2 nodes.
-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5203850
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Failover topic subscribers

Posted by James Strachan <ja...@gmail.com>.
On 7/6/06, James Strachan <ja...@gmail.com> wrote:
[snip]
> Basically we could do something like this...
>
> * we configure one or more wildcards to indicate the 'virtual topics'
> to use.  e.g. something like this in the activemq.xml...
>
> <virtualTopic topic="FOO.>" queuePrefix="VirtualTopic."/>
>
> * when a durable topic message is sent to a virtual topic FOO.BAR  we
> dispatch it to the non-persistent Topic region as before, but instead
> of using the persistent topic region we do the following...

Strike that. Replace the above with...

* when a topic message is sent to a virtual topic FOO.BAR  we dispatch
it as normal (as folks could be using regular topic subscriptions -
durable and non-durable on this topic. But in addition we also then...


> * look for all the queues called VirtualTopic.*.FOO.BAR and send a
> copy of the message to it.

> * then any consumer can subscribe to a queue called
> VirtualTopic.${subscriberName}.FOO.BAR

etc.

So its much simpler afterall :) The interceptor just needs to look for
all the available queues for ${prefix}.*.${topicName}. A simple
implemenation of that would iterate through the
broker.getDestinationMap().

A more optimal solution would be to do something like

Set destinations = getDestinations(new ActiveMQQueue(prefix + ".*." +
topicName);

(I'm just adding an implementation of the getDestinations() method
into the Broker/Region interfaces)

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Failover topic subscribers

Posted by James Strachan <ja...@gmail.com>.
On 7/6/06, bmadigan <bm...@orbitz.com> wrote:
> Would it be possible to implement this as a Broker plugin or would more
> fundamental changes need to be made? I don't think I have enough control
> over sending messages by overriding send() and acknowledge() in
> BrokerFilter.

Very efficient version
================
Its a reasonable amount of work to do very efficiently (thats often
the way - its often easy to do things, but much harder to do them very
efficiently :).

Basically for a really efficient implementation we need to create a
version of durable topics which supports out of order acknowledgements
(so rather than keeping the last ack'd message ID per subscription
we'd maintain all the to-be-ack'd message Ids) then we'd need to build
a Queue abstraction around an individual topic subscription - then use
a BrokerFilter to delegate to the new types of durable topics if they
are 'queue enabled' etc.

All of this would be required to allow the message to be persisted
once, then each consumer just has a small acknowledgement table etc.
So a fair amount of work to do really efficiently.

However given the usefulness of such a feature - how about we
implement a much simpler solution to get the feature done sooner...


Simpler to implement version
=======================
A simpler solution could just be that we reuse the existing queue
implementations (so each 'virtual topic subscription' actually gets
its own physical queue) and we in effect just copy the message to all
the physical queues using one actual queue per logical topic
subscription. This would be fairly simple to implement using an
inteceptor - and is fairly simple to do in fact - the only downside is
it results in the message being persisted N times, once for each
subscription.

Basically we could do something like this...

* we configure one or more wildcards to indicate the 'virtual topics'
to use.  e.g. something like this in the activemq.xml...

<virtualTopic topic="FOO.>" queuePrefix="VirtualTopic."/>

* when a durable topic message is sent to a virtual topic FOO.BAR  we
dispatch it to the non-persistent Topic region as before, but instead
of using the persistent topic region we do the following...

* look for all the queues called VirtualTopic.*.FOO.BAR and send a
copy of the message to it. (We could maybe add some prefix to the
beginning to separate out the physical implementation queues for the
virtual topics).

* then any consumer can subscribe to a queue called
VirtualTopic.${subscriberName}.FOO.BAR which if you are the first
person to use a given subscriptionName the effect is to dynamically
create a virtual durable topic subscription for ${subscriptionName}.
You would then be using regular queue semantics - so folks can browse
the queue, delete messages from the queue and so forth.

e.g. we could create 100 consumers on VirtualTopic.A.FOO.BAR and 50
consumers on VirtualTopic.B.FOO.BAR which would logically represent 2
virtual topic subscriptions of topic FOO.BAR. i.e. you'd then get all
the benefit of queues - load balancing, parallelization, failover
along with features like exclusive queues and message groups - yet the
producer is a regular topic publisher and the consumer sides are
queues - so the effect is all standard JMS contracts.

Incidentally the reason I'm suggesting to use queue names of the form
${prefix}.${subscriberName}.${topicName} is so that we could then
support wildcards on the topics.

e.g. VirtualTopic.consumerAbc.Products.Electonics.>


I'd really love this feature - as durable topics in JMS are nowhere
near as useful as they should be - so starting with the simpler
approach sounds good to me :)

Anyone fancy taking a stab at implementing it? :)

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Failover topic subscribers

Posted by bmadigan <bm...@orbitz.com>.
Would it be possible to implement this as a Broker plugin or would more
fundamental changes need to be made? I don't think I have enough control
over sending messages by overriding send() and acknowledge() in
BrokerFilter.
-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5202219
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Failover topic subscribers

Posted by James Strachan <ja...@gmail.com>.
On 7/5/06, bmadigan <bm...@orbitz.com> wrote:
>
> I just found this post:
>
> http://www.nabble.com/Clustering-Topic-Consumers-tf1835532.html#a5009892
>
> We can create logical topics using a Queue for each topic, then we can use
> Exclusive Consumers for load balancing. I don't think that would require
> much change.

Yes. If you want to increase the parallelization you can use message groups.

One day I'd like us to implement virtual queues on top of durable
topics so folks can use a single durable topic subscription as a
virtual queue  (so getting things like exclusive consumer or message
group or just load balancing logic etc).

http://issues.apache.org/activemq/browse/AMQ-452
-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Failover topic subscribers

Posted by bmadigan <bm...@orbitz.com>.
I just found this post:

http://www.nabble.com/Clustering-Topic-Consumers-tf1835532.html#a5009892

We can create logical topics using a Queue for each topic, then we can use
Exclusive Consumers for load balancing. I don't think that would require
much change. 
-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5190003
Sent from the ActiveMQ - User forum at Nabble.com.