You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by James Strachan <ja...@gmail.com> on 2006/07/14 11:37:41 UTC

Virtual Topics (was Re: Failover topic subscribers)

I thought I'd change the subject as really we are discussing a kind of
Virtual Topics where folks can use Queues to subscribe and consume
from them etc...


On 7/13/06, bmadigan <bm...@orbitz.com> wrote:
>
> This is almost working,

YAY!

> there are a few things I need to fix:
> - Need to figure out how to add the new Broker to the factory without using
> the plugin loader

I was thinking, we should maybe add the Virtual Topic interceptor to
the broker by default, but allow the configuration to be overridden in
the activemq.xml as I can't help think it's be great to be able to use
it out of the box.

We've already got the "ActiveMQ.Advisory." prefix in topics to be used
for advisories. So how about we use ActiveMQ.Virtual. as the prefix
for the default prefix for virtual topics & we let folks customize
this if they don't like the defaults?

So we should maybe add a VirtualDestinationPlugin property on the
BrokerService which is created by default (unless explicitly disabled
using a bean property) which would auto-default to a sensible default
that could be overloaded- or forks could set the virtual destinations
to something else if they prefer etc.


> - It may not be a problem, but I'm synchronizing  on next when I create the
> queues for the virtual groups in addConsumer().  This could be finer grained
> I think.

Am not sure if you need to do that; I think the default mechanism of
clients consuming on the queue with the destination auto-created on
the fly would be fine?


> - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
> consumer to a virtual queue. This is probably incorrect, not sure if there
> is a better way.

Are you trying to transform a durable topic consumer into a queue
consumer? Am not sure you need. Am thinking if you are a topic
consumer (durable or non-durable) then we leave you as you are; a
fully JMS compliant durable topic consumer. However to use the nice
queue-centric virtual topics, you really use a queue consumer of the
right name and things all just work. (BTW the topic regions are
optimised so that if no consumers are available, publishing to a topic
is a no-op)


> - The virtual queues can't provide subscription recovery. Not sure how to
> handle that.

Yeah thats true. Am not too worried about that right now - but we
could look at fixing that later on.


> I created a BrokerFilter subclass which overrides addConsumer() and send():
>
>  public Subscription addConsumer(ConnectionContext cc,
>                                     ConsumerInfo ci) throws Exception {
>         synchronized(next){
>             String name  = ci.getDestination().getPhysicalName();
>             if(name.startsWith(VIRTUAL)){
>                 Set destinations = getDestinations(
>                         new ActiveMQQueue(name));
>                 if(destinations.size()==0){//create a new virtual queue
>                     ActiveMQQueue queue = new ActiveMQQueue(
>                             name+"?consumer.exclusive=true");
>                         next.addDestination(cc,queue);
>                     ci.setDestination(queue);
>                 }else{ //queue exists, add the consumer
>                     ActiveMQQueue queue = (ActiveMQQueue)
>                             destinations.iterator().next();
>                     ci.setDestination(queue);
>                 }
>             }
>         }
>         return next.addConsumer(cc, ci);
>     }
>
>     public void send(ConnectionContext ctx,
>                      Message message) throws Exception {
>         String topic = message.getDestination().getPhysicalName();
>         Iterator destinations = getDestinations(
>                 new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
>         while(destinations.hasNext()){
>             Destination dest = (Destination) destinations.next();
>             dest.send(ctx, message);
>         }
>         next.send(ctx, message);
>     }
>
> Except for the subscription recovery part, this seems to work.

Looks great - though am thinking we only really need the send() part -
as the other stuff like adding consumers should just work out side the
box (unless I'm missing something).

BTW do you want to submit a patch, then we can start wiring this stuff
in? Great work though - am excited to see this stuff implemented! :)

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.

James.Strachan wrote:
> 
> Incidentally in the example you give, was the consumer adding a
> consumer to a temporary queue?
> 

Hmm, not intentionally, I'm not sure. I change the subscriber from a Topic
type to a Queue using the new Virtual prefix. When it was a Topic, I could
see the Virtual consumer being added in addConsumer(..), but after changing
it to a Queue type, addConsumer() was only being called for the advisory
topic. A little confusing. I'm not going to worry about it..
-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5382589
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
I turned off advisory just to get it out of the way (advisorySupport=false),
but it looks like advisory topics are still being created.  
-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5387994
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
Sorry I've not responded to this thread yet - been a bit snowed on
other stuff. Yes I think we should be creating the destinations in
addConsumer() BTW.

The reason for the creation of the ActiveMQ.Advisory.TempQueue is I
think part of the usual advisory mechanism...

http://incubator.apache.org/activemq/advisory-message.html

Incidentally in the example you give, was the consumer adding a
consumer to a temporary queue?


On 7/17/06, bmadigan <bm...@orbitz.com> wrote:
>
> I added logging to AbstractRegion and the virtual topics broker to see what
> destinations are created when a consumer connects.
>
> INFO  ConsumerGroupsBroker - Adding Consumer for Destination:
> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> INFO  AbstractRegion  - Adding consumer:
> ID:photon.duncllc.com-60318-1153164652757-1:0:-1:1
>
> When I add a consumer, I'm not sure why it's destination is a temp advisory
> topic. I expect to see the destination that the consumer is listening on:
>  <property name="destination" value="ActiveMQ.Virtual.TESTGROUP1.TEST"/>
>
> ???
> I just noticed that the last 3 replies were to myself, so you were probably
> not notified.
> --
> View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5367679
> Sent from the ActiveMQ - Dev forum at Nabble.com.
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
I added logging to AbstractRegion and the virtual topics broker to see what
destinations are created when a consumer connects. 

INFO  ConsumerGroupsBroker - Adding Consumer for Destination:
topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
INFO  AbstractRegion  - Adding consumer:
ID:photon.duncllc.com-60318-1153164652757-1:0:-1:1

When I add a consumer, I'm not sure why it's destination is a temp advisory
topic. I expect to see the destination that the consumer is listening on:
 <property name="destination" value="ActiveMQ.Virtual.TESTGROUP1.TEST"/>

???
I just noticed that the last 3 replies were to myself, so you were probably
not notified.
-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5367679
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
Or AbstractRegion.. I think I can get tthis to work without messing up any of
the persistence logic.
-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5364499
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
In order for virtual topics to work, getDestinations(.. ) needs to return the
virtual destinations. This doesnt happen now since Queues prefixed with
ActiveMQ.Virtual don't ever get added to the Destinations Set (nothing
actually sends them a message). I'm looking for the place in the broker
change where I could change this behavior. Any ideas? RegionBroker looks
like it does most of the work.

-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5364439
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
I updated AbstractRegion, but I am not seeing any change in the behavior
after a clean build. Where was the change? addConsumer(..) in AbstractRegion
appears to be the same. Subsequent updates did not show any new changes in
other files.

When a queue consumer connects, I still see only:
INFO  AbstractRegion                 - Adding destination:
topic://ActiveMQ.Advisory.Connection
INFO  AbstractRegion                 - Adding destination:
topic://ActiveMQ.Advisory.Topic

So in send(), there are no wildcard destinations found:
 INFO  ConsumerGroupsBroker           - Found 0 Virtual Destinations for
ActiveMQ.Virtual.*.TEST
               
Thanks,
Brian Madigan
-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5385718
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
On 7/24/06, bmadigan <bm...@orbitz.com> wrote:
> Thanks James,
> looks good. Now I see what was wrong with the wildcard :)

:)

> Is there a rough release date for the 4.1 codebase?

No - I'll start a separate thread on releases to try figure out whats
left to do to start thinking of doing a 4.1 etc

> Glad things could come together,

Me too! :) - thanks for all your hard work in this Brian

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
Thanks James, 
looks good. Now I see what was wrong with the wildcard :) 
Is there a rough release date for the 4.1 codebase?
Glad things could come together,
Brian Madigan
-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5473965
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
Many thanks! I've added your patch to svn trunk and have got the test
case working (by tweaking the wildcard slightly and adding a pause
between sending the messages and asserting they are consumed). Yay!

For now I've made the code add a VirtualTopicBroker by default -
unless its disabled. We may  want to get more clever going forward by
allowing virtual topics & queues to be configured via XML in the
broker.xml file.

See if SVN trunk is working for you now - from my initial testing it
looks OK to me

On 7/21/06, bmadigan <bm...@orbitz.com> wrote:
>
> Thanks James, that test case works for me too. I wrote a use case that (I
> think) covers the base Virtual Topic functionality. There is a problem
> somewhere that causes this test to fail. Running it in debug I can see that
> the Message is dispatched, but not delivered for some reason. Most of the
> internals for virtual topics seem to be working fine though, so thats good
> news. If you run the test case below, you can see that the MessageListeners
> on the queue don't get any messages.  There is some additional code to add
> the VirtualTopicBroker to the interceptor chain in BrokerService (or it can
> be added as a plugin).
>
> Test case:
>
> package org.apache.activemq.usecases;
>
> import org.apache.log4j.Logger;
> import org.apache.activemq.EmbeddedBrokerTestSupport;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.command.ActiveMQTopic;
>
> import javax.jms.Session;
> import javax.jms.Connection;
> import javax.jms.MessageProducer;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Message;
>
> public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
>
>     private Connection connection;
>
>     public void testVirtualTopicCreation( )throws Exception{
>         if(connection == null){
>             connection = createConnection();
>         }
>
>         String queueAName  = "ActiveMQ.Virtual.A.TEST";
>         //create consumer 'cluster'
>         ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
>         ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
>
>         Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>         MessageConsumer c1 = session.createConsumer(queue1);
>         MessageConsumer c2 =  session.createConsumer(queue2);
>
>         MessageCountListener exclusive1 = new MessageCountListener();
>         c1.setMessageListener(exclusive1);
>
>         MessageCountListener exclusive2 = new MessageCountListener();
>         c2.setMessageListener(exclusive2);
>
>         //create topic producer
>         MessageProducer producer =
>                 session.createProducer(new ActiveMQTopic("TEST"));
>         assertNotNull(producer);
>
>         int total = 10;
>         for(int i = 0; i < total; i++){
>             producer.send(session.createTextMessage("xxxxxxxxxxxxxxxxxx"));
>         }
>
>         int delivered = exclusive1.getCount( ) & exclusive2.getCount();
>         assertTrue("Expected "+total+" delivered, found "+delivered,
>                 delivered == total);
>
>     }
>
>     class MessageCountListener implements MessageListener{
>
>         private int count = 0;
>
>         public void onMessage(Message m){
>             System.out.println("Got one! "+count);
>             count++;
>         }
>
>         public int getCount(){
>             return count;
>         }
>     }
>
>     protected void tearDown() throws Exception {
>         if (connection != null) {
>             connection.close();
>         }
>         super.tearDown();
>     }
>
>
> the Broker:
>
> package org.apache.activemq.broker;
>
> import org.apache.activemq.broker.region.Destination;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.command.Message;
>
> import java.util.Iterator;
> import java.util.Set;
>
>
> public class VirtualTopicBroker
>         extends BrokerFilter
>         implements BrokerPlugin {
>
>     public static final String VIRTUAL_WILDCARD = "ActiveMQ.Virtual.*";
>
>     public VirtualTopicBroker(Broker next) {
>         super(next);
>     }
>
>     public VirtualTopicBroker() {
>         super(null);
>     }
>
>     public void send(ConnectionContext ctx,
>                      Message message) throws Exception {
>
>         String name = message.getDestination().getPhysicalName();
>
>         String virtualName = VIRTUAL_WILDCARD+ name;
>
>         Set destinations = getDestinations(
>                 new ActiveMQQueue(virtualName));
>
>         for (Iterator iter = destinations.iterator();
>              iter.hasNext();) {
>             Destination dest = (Destination) iter.next();
>             dest.send(ctx, message);
>         }
>         next.send(ctx, message);
>     }
>
>     public Broker installPlugin(Broker broker) throws Exception {
>         return new VirtualTopicBroker(broker);
>     }
> }
>
>
> --
> View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5439965
> Sent from the ActiveMQ - Dev forum at Nabble.com.
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
Thanks James, that test case works for me too. I wrote a use case that (I
think) covers the base Virtual Topic functionality. There is a problem
somewhere that causes this test to fail. Running it in debug I can see that
the Message is dispatched, but not delivered for some reason. Most of the
internals for virtual topics seem to be working fine though, so thats good
news. If you run the test case below, you can see that the MessageListeners
on the queue don't get any messages.  There is some additional code to add
the VirtualTopicBroker to the interceptor chain in BrokerService (or it can
be added as a plugin).

Test case:

package org.apache.activemq.usecases;

import org.apache.log4j.Logger;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;

import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Message;

public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {

    private Connection connection;

    public void testVirtualTopicCreation( )throws Exception{
        if(connection == null){
            connection = createConnection();
        }

        String queueAName  = "ActiveMQ.Virtual.A.TEST";
        //create consumer 'cluster'
        ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
        ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);

        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        MessageConsumer c1 = session.createConsumer(queue1);
        MessageConsumer c2 =  session.createConsumer(queue2);

        MessageCountListener exclusive1 = new MessageCountListener();
        c1.setMessageListener(exclusive1);

        MessageCountListener exclusive2 = new MessageCountListener();
        c2.setMessageListener(exclusive2);

        //create topic producer
        MessageProducer producer =
                session.createProducer(new ActiveMQTopic("TEST"));
        assertNotNull(producer);

        int total = 10;
        for(int i = 0; i < total; i++){
            producer.send(session.createTextMessage("xxxxxxxxxxxxxxxxxx"));
        }

        int delivered = exclusive1.getCount( ) & exclusive2.getCount();
        assertTrue("Expected "+total+" delivered, found "+delivered,
                delivered == total);

    }

    class MessageCountListener implements MessageListener{

        private int count = 0;

        public void onMessage(Message m){
            System.out.println("Got one! "+count);
            count++;
        }

        public int getCount(){
            return count;
        }
    }

    protected void tearDown() throws Exception {
        if (connection != null) {
            connection.close();
        }
        super.tearDown();
    }


the Broker:

package org.apache.activemq.broker;

import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;

import java.util.Iterator;
import java.util.Set;


public class VirtualTopicBroker
        extends BrokerFilter
        implements BrokerPlugin {

    public static final String VIRTUAL_WILDCARD = "ActiveMQ.Virtual.*";

    public VirtualTopicBroker(Broker next) {
        super(next);
    }

    public VirtualTopicBroker() {
        super(null);
    }

    public void send(ConnectionContext ctx,
                     Message message) throws Exception {

        String name = message.getDestination().getPhysicalName();

        String virtualName = VIRTUAL_WILDCARD+ name;

        Set destinations = getDestinations(
                new ActiveMQQueue(virtualName));

        for (Iterator iter = destinations.iterator();
             iter.hasNext();) {
            Destination dest = (Destination) iter.next();
            dest.send(ctx, message);
        }
        next.send(ctx, message);
    }

    public Broker installPlugin(Broker broker) throws Exception {
        return new VirtualTopicBroker(broker);
    }
}


-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5439965
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
OK I've just added a test case that seems to show that the
destinations are being created lazily when a consumer is added...

http://svn.apache.org/repos/asf/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NewConsumerCreatesDestinationTest.java

I've tried to simulate the kind of code you'll be calling the virtual
topics stuff - namely using the
broker.getDestination(topicSubscriberWildcard) to find all the virtual
topic subscriber queues etc and it seems to work for me using SVN
trunk.


On 7/20/06, James Strachan <ja...@gmail.com> wrote:
> On 7/19/06, bmadigan <bm...@orbitz.com> wrote:
> >
> > Is this the change to call lookup(...) in addConsumer( )?
>
> Yes
>
> > Looks like its not
> > being called.
>
> Damn - sorry about that. I knew I should have taken the time to write
> a test case :). Lemme see if I get chance to write a little test case
> today....
>
> > I'd like to run the broker in debug, I'm having trouble
> > figuring out where exactly the lazy creation of destinations is happening.
>
> It should be the lookup() which calls addDestination() I think. More
> in a little while...
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
On 7/19/06, bmadigan <bm...@orbitz.com> wrote:
>
> Is this the change to call lookup(...) in addConsumer( )?

Yes

> Looks like its not
> being called.

Damn - sorry about that. I knew I should have taken the time to write
a test case :). Lemme see if I get chance to write a little test case
today....

> I'd like to run the broker in debug, I'm having trouble
> figuring out where exactly the lazy creation of destinations is happening.

It should be the lookup() which calls addDestination() I think. More
in a little while...

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
Is this the change to call lookup(...) in addConsumer( )? Looks like its not
being called. I'd like to run the broker in debug, I'm having trouble
figuring out where exactly the lazy creation of destinations is happening.
-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5404863
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
On 7/14/06, bmadigan <bm...@orbitz.com> wrote:
>
> I think something will need to be done in addConsumer(...). When a consumer
> subscribes to a queue called ActiveMQ.Virtual.GroupName.TopicName, the
> Destination is not created until someone sends a message to it.

I think we should fix that.

I've just committed a patch to AbstractRegion so that when
addConsumer() is called the destination will be lazily created (unless
the subscription is itself a wildcard etc). I wonder if that helps fix
your issue? Its basically the same code being used thats called on the
send() to auto create the destination so fingers crossed it solves
your problem.

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.
I think something will need to be done in addConsumer(...). When a consumer
subscribes to a queue called ActiveMQ.Virtual.GroupName.TopicName, the
Destination is not created until someone sends a message to it. 

The getDestinations(ActiveMQDestination) method returns nothing for
"ActiveMQ.Virtual.*.TEST" when a producer publishes to a Topic called
"TEST".  Should we override the lazy instantiation of ActiveMQDestinations
for consumers with the ActiveMQ.Virtual prefix, or just change the way
getDestinations(..) works to get destinations that have not been created?
-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5332644
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
BTW I've created a wiki page describing the idea of Virtual Topics and
to give a quick brain dump of the history of this thread...

http://activemq.org/site/virtual-destinations.html

by all means dive in and edit the page (the edit link at the bottom of
the page).

On 7/14/06, James Strachan <ja...@gmail.com> wrote:
> BTW lets move this thread over to the dev list :)
>
>
> On 7/14/06, James Strachan <ja...@gmail.com> wrote:
> > I thought I'd change the subject as really we are discussing a kind of
> > Virtual Topics where folks can use Queues to subscribe and consume
> > from them etc...
> >
> >
> > On 7/13/06, bmadigan <bm...@orbitz.com> wrote:
> > >
> > > This is almost working,
> >
> > YAY!
> >
> > > there are a few things I need to fix:
> > > - Need to figure out how to add the new Broker to the factory without using
> > > the plugin loader
> >
> > I was thinking, we should maybe add the Virtual Topic interceptor to
> > the broker by default, but allow the configuration to be overridden in
> > the activemq.xml as I can't help think it's be great to be able to use
> > it out of the box.
> >
> > We've already got the "ActiveMQ.Advisory." prefix in topics to be used
> > for advisories. So how about we use ActiveMQ.Virtual. as the prefix
> > for the default prefix for virtual topics & we let folks customize
> > this if they don't like the defaults?
> >
> > So we should maybe add a VirtualDestinationPlugin property on the
> > BrokerService which is created by default (unless explicitly disabled
> > using a bean property) which would auto-default to a sensible default
> > that could be overloaded- or forks could set the virtual destinations
> > to something else if they prefer etc.
> >
> >
> > > - It may not be a problem, but I'm synchronizing  on next when I create the
> > > queues for the virtual groups in addConsumer().  This could be finer grained
> > > I think.
> >
> > Am not sure if you need to do that; I think the default mechanism of
> > clients consuming on the queue with the destination auto-created on
> > the fly would be fine?
> >
> >
> > > - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
> > > consumer to a virtual queue. This is probably incorrect, not sure if there
> > > is a better way.
> >
> > Are you trying to transform a durable topic consumer into a queue
> > consumer? Am not sure you need. Am thinking if you are a topic
> > consumer (durable or non-durable) then we leave you as you are; a
> > fully JMS compliant durable topic consumer. However to use the nice
> > queue-centric virtual topics, you really use a queue consumer of the
> > right name and things all just work. (BTW the topic regions are
> > optimised so that if no consumers are available, publishing to a topic
> > is a no-op)
> >
> >
> > > - The virtual queues can't provide subscription recovery. Not sure how to
> > > handle that.
> >
> > Yeah thats true. Am not too worried about that right now - but we
> > could look at fixing that later on.
> >
> >
> > > I created a BrokerFilter subclass which overrides addConsumer() and send():
> > >
> > >  public Subscription addConsumer(ConnectionContext cc,
> > >                                     ConsumerInfo ci) throws Exception {
> > >         synchronized(next){
> > >             String name  = ci.getDestination().getPhysicalName();
> > >             if(name.startsWith(VIRTUAL)){
> > >                 Set destinations = getDestinations(
> > >                         new ActiveMQQueue(name));
> > >                 if(destinations.size()==0){//create a new virtual queue
> > >                     ActiveMQQueue queue = new ActiveMQQueue(
> > >                             name+"?consumer.exclusive=true");
> > >                         next.addDestination(cc,queue);
> > >                     ci.setDestination(queue);
> > >                 }else{ //queue exists, add the consumer
> > >                     ActiveMQQueue queue = (ActiveMQQueue)
> > >                             destinations.iterator().next();
> > >                     ci.setDestination(queue);
> > >                 }
> > >             }
> > >         }
> > >         return next.addConsumer(cc, ci);
> > >     }
> > >
> > >     public void send(ConnectionContext ctx,
> > >                      Message message) throws Exception {
> > >         String topic = message.getDestination().getPhysicalName();
> > >         Iterator destinations = getDestinations(
> > >                 new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
> > >         while(destinations.hasNext()){
> > >             Destination dest = (Destination) destinations.next();
> > >             dest.send(ctx, message);
> > >         }
> > >         next.send(ctx, message);
> > >     }
> > >
> > > Except for the subscription recovery part, this seems to work.
> >
> > Looks great - though am thinking we only really need the send() part -
> > as the other stuff like adding consumers should just work out side the
> > box (unless I'm missing something).
> >
> > BTW do you want to submit a patch, then we can start wiring this stuff
> > in? Great work though - am excited to see this stuff implemented! :)
> >
> > --
> >
> > James
> > -------
> > http://radio.weblogs.com/0112098/
> >
>
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
BTW lets move this thread over to the dev list :)


On 7/14/06, James Strachan <ja...@gmail.com> wrote:
> I thought I'd change the subject as really we are discussing a kind of
> Virtual Topics where folks can use Queues to subscribe and consume
> from them etc...
>
>
> On 7/13/06, bmadigan <bm...@orbitz.com> wrote:
> >
> > This is almost working,
>
> YAY!
>
> > there are a few things I need to fix:
> > - Need to figure out how to add the new Broker to the factory without using
> > the plugin loader
>
> I was thinking, we should maybe add the Virtual Topic interceptor to
> the broker by default, but allow the configuration to be overridden in
> the activemq.xml as I can't help think it's be great to be able to use
> it out of the box.
>
> We've already got the "ActiveMQ.Advisory." prefix in topics to be used
> for advisories. So how about we use ActiveMQ.Virtual. as the prefix
> for the default prefix for virtual topics & we let folks customize
> this if they don't like the defaults?
>
> So we should maybe add a VirtualDestinationPlugin property on the
> BrokerService which is created by default (unless explicitly disabled
> using a bean property) which would auto-default to a sensible default
> that could be overloaded- or forks could set the virtual destinations
> to something else if they prefer etc.
>
>
> > - It may not be a problem, but I'm synchronizing  on next when I create the
> > queues for the virtual groups in addConsumer().  This could be finer grained
> > I think.
>
> Am not sure if you need to do that; I think the default mechanism of
> clients consuming on the queue with the destination auto-created on
> the fly would be fine?
>
>
> > - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
> > consumer to a virtual queue. This is probably incorrect, not sure if there
> > is a better way.
>
> Are you trying to transform a durable topic consumer into a queue
> consumer? Am not sure you need. Am thinking if you are a topic
> consumer (durable or non-durable) then we leave you as you are; a
> fully JMS compliant durable topic consumer. However to use the nice
> queue-centric virtual topics, you really use a queue consumer of the
> right name and things all just work. (BTW the topic regions are
> optimised so that if no consumers are available, publishing to a topic
> is a no-op)
>
>
> > - The virtual queues can't provide subscription recovery. Not sure how to
> > handle that.
>
> Yeah thats true. Am not too worried about that right now - but we
> could look at fixing that later on.
>
>
> > I created a BrokerFilter subclass which overrides addConsumer() and send():
> >
> >  public Subscription addConsumer(ConnectionContext cc,
> >                                     ConsumerInfo ci) throws Exception {
> >         synchronized(next){
> >             String name  = ci.getDestination().getPhysicalName();
> >             if(name.startsWith(VIRTUAL)){
> >                 Set destinations = getDestinations(
> >                         new ActiveMQQueue(name));
> >                 if(destinations.size()==0){//create a new virtual queue
> >                     ActiveMQQueue queue = new ActiveMQQueue(
> >                             name+"?consumer.exclusive=true");
> >                         next.addDestination(cc,queue);
> >                     ci.setDestination(queue);
> >                 }else{ //queue exists, add the consumer
> >                     ActiveMQQueue queue = (ActiveMQQueue)
> >                             destinations.iterator().next();
> >                     ci.setDestination(queue);
> >                 }
> >             }
> >         }
> >         return next.addConsumer(cc, ci);
> >     }
> >
> >     public void send(ConnectionContext ctx,
> >                      Message message) throws Exception {
> >         String topic = message.getDestination().getPhysicalName();
> >         Iterator destinations = getDestinations(
> >                 new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
> >         while(destinations.hasNext()){
> >             Destination dest = (Destination) destinations.next();
> >             dest.send(ctx, message);
> >         }
> >         next.send(ctx, message);
> >     }
> >
> > Except for the subscription recovery part, this seems to work.
>
> Looks great - though am thinking we only really need the send() part -
> as the other stuff like adding consumers should just work out side the
> box (unless I'm missing something).
>
> BTW do you want to submit a patch, then we can start wiring this stuff
> in? Great work though - am excited to see this stuff implemented! :)
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
On 7/14/06, bmadigan <bm...@orbitz.com> wrote:
>
>
>
> >> - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
> >> consumer to a virtual queue. This is probably incorrect, not sure if
> >> there
> >> is a better way.
> >
> > Are you trying to transform a durable topic consumer into a queue
> > consumer? Am not sure you need. Am thinking if you are a topic
> > consumer (durable or non-durable) then we leave you as you are; a
> > fully JMS compliant durable topic consumer. However to use the nice
> > queue-centric virtual topics, you really use a queue consumer of the
> > right name and things all just work. (BTW the topic regions are
> > optimised so that if no consumers are available, publishing to a topic
> > is a no-op)
> >
>
> Yep, thats what I was attempting. If a subscriber signs up to a virtual
> topic,
> a queue is created for the virtual topic group. I was synching on next to
> make sure
> only one queue is created for any group of consumers. If I can just have the
> consumer
> subscribe to a Queue called ActiveMQ.Virtual.GroupName.TopicName, then I can
> just get rid of the addConsumer implementation completely. That makes it
> even simpler!

Agreed! Then it means that we're reusing all that code for
auto-creation of queues etc.

> >> - The virtual queues can't provide subscription recovery. Not sure how to
> >> handle that.
> >
> > Yeah thats true. Am not too worried about that right now - but we
> > could look at fixing that later on.
> >
>
> I think it will work now that I am not creating the Queues in the
> interceptor. I will test this since most of our teams implementing listeners
> require recovery.

OK.

BTW note that once a queue is created, messages will be delivered to
it for the virtual topic; so we'll have to add some kind of message
eviction for non-persistent messages at some point. (A quick fix could
be to make them persistent by default :)

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by bmadigan <bm...@orbitz.com>.


>> - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
>> consumer to a virtual queue. This is probably incorrect, not sure if
>> there
>> is a better way.
> 
> Are you trying to transform a durable topic consumer into a queue
> consumer? Am not sure you need. Am thinking if you are a topic
> consumer (durable or non-durable) then we leave you as you are; a
> fully JMS compliant durable topic consumer. However to use the nice
> queue-centric virtual topics, you really use a queue consumer of the
> right name and things all just work. (BTW the topic regions are
> optimised so that if no consumers are available, publishing to a topic
> is a no-op)
> 

Yep, thats what I was attempting. If a subscriber signs up to a virtual
topic, 
a queue is created for the virtual topic group. I was synching on next to
make sure 
only one queue is created for any group of consumers. If I can just have the
consumer 
subscribe to a Queue called ActiveMQ.Virtual.GroupName.TopicName, then I can
just get rid of the addConsumer implementation completely. That makes it
even simpler!



>> - The virtual queues can't provide subscription recovery. Not sure how to
>> handle that.
> 
> Yeah thats true. Am not too worried about that right now - but we
> could look at fixing that later on.
> 

I think it will work now that I am not creating the Queues in the
interceptor. I will test this since most of our teams implementing listeners
require recovery. 

-- 
View this message in context: http://www.nabble.com/Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942381.html#a5331610
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
BTW lets move this thread over to the dev list :)


On 7/14/06, James Strachan <ja...@gmail.com> wrote:
> I thought I'd change the subject as really we are discussing a kind of
> Virtual Topics where folks can use Queues to subscribe and consume
> from them etc...
>
>
> On 7/13/06, bmadigan <bm...@orbitz.com> wrote:
> >
> > This is almost working,
>
> YAY!
>
> > there are a few things I need to fix:
> > - Need to figure out how to add the new Broker to the factory without using
> > the plugin loader
>
> I was thinking, we should maybe add the Virtual Topic interceptor to
> the broker by default, but allow the configuration to be overridden in
> the activemq.xml as I can't help think it's be great to be able to use
> it out of the box.
>
> We've already got the "ActiveMQ.Advisory." prefix in topics to be used
> for advisories. So how about we use ActiveMQ.Virtual. as the prefix
> for the default prefix for virtual topics & we let folks customize
> this if they don't like the defaults?
>
> So we should maybe add a VirtualDestinationPlugin property on the
> BrokerService which is created by default (unless explicitly disabled
> using a bean property) which would auto-default to a sensible default
> that could be overloaded- or forks could set the virtual destinations
> to something else if they prefer etc.
>
>
> > - It may not be a problem, but I'm synchronizing  on next when I create the
> > queues for the virtual groups in addConsumer().  This could be finer grained
> > I think.
>
> Am not sure if you need to do that; I think the default mechanism of
> clients consuming on the queue with the destination auto-created on
> the fly would be fine?
>
>
> > - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
> > consumer to a virtual queue. This is probably incorrect, not sure if there
> > is a better way.
>
> Are you trying to transform a durable topic consumer into a queue
> consumer? Am not sure you need. Am thinking if you are a topic
> consumer (durable or non-durable) then we leave you as you are; a
> fully JMS compliant durable topic consumer. However to use the nice
> queue-centric virtual topics, you really use a queue consumer of the
> right name and things all just work. (BTW the topic regions are
> optimised so that if no consumers are available, publishing to a topic
> is a no-op)
>
>
> > - The virtual queues can't provide subscription recovery. Not sure how to
> > handle that.
>
> Yeah thats true. Am not too worried about that right now - but we
> could look at fixing that later on.
>
>
> > I created a BrokerFilter subclass which overrides addConsumer() and send():
> >
> >  public Subscription addConsumer(ConnectionContext cc,
> >                                     ConsumerInfo ci) throws Exception {
> >         synchronized(next){
> >             String name  = ci.getDestination().getPhysicalName();
> >             if(name.startsWith(VIRTUAL)){
> >                 Set destinations = getDestinations(
> >                         new ActiveMQQueue(name));
> >                 if(destinations.size()==0){//create a new virtual queue
> >                     ActiveMQQueue queue = new ActiveMQQueue(
> >                             name+"?consumer.exclusive=true");
> >                         next.addDestination(cc,queue);
> >                     ci.setDestination(queue);
> >                 }else{ //queue exists, add the consumer
> >                     ActiveMQQueue queue = (ActiveMQQueue)
> >                             destinations.iterator().next();
> >                     ci.setDestination(queue);
> >                 }
> >             }
> >         }
> >         return next.addConsumer(cc, ci);
> >     }
> >
> >     public void send(ConnectionContext ctx,
> >                      Message message) throws Exception {
> >         String topic = message.getDestination().getPhysicalName();
> >         Iterator destinations = getDestinations(
> >                 new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
> >         while(destinations.hasNext()){
> >             Destination dest = (Destination) destinations.next();
> >             dest.send(ctx, message);
> >         }
> >         next.send(ctx, message);
> >     }
> >
> > Except for the subscription recovery part, this seems to work.
>
> Looks great - though am thinking we only really need the send() part -
> as the other stuff like adding consumers should just work out side the
> box (unless I'm missing something).
>
> BTW do you want to submit a patch, then we can start wiring this stuff
> in? Great work though - am excited to see this stuff implemented! :)
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by James Strachan <ja...@gmail.com>.
On 7/21/06, Sridhar Komandur <an...@komandur.com> wrote:
> Hi,
>
> Thanks for the Virtual Topic feature.
>
> We have a need for this feature now. Any ETA on this feature ?

So it seems to be working fine in SVN trunk - could you try testing it
out in your environment to see if its working for you?

> Please let me know, if I can contribute in anyway to expedite the work.

Currently testing is the highest priority; we could maybe create some
better test cases.

As I said in the previous message, at some point it'd be good to
provide some kind of custom XML to configure virtual queues and topics
in the broker.xml but for now having a hard-wired virtual topic tree
is a good first step.
-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Virtual Topics (was Re: Failover topic subscribers)

Posted by Sridhar Komandur <an...@komandur.com>.
Hi,

Thanks for the Virtual Topic feature.

We have a need for this feature now. Any ETA on this feature ?
Please let me know, if I can contribute in anyway to expedite the work.

Thanks
Regards
- Sridhar Komandur


On 7/14/06, James Strachan <ja...@gmail.com> wrote:
>
> I thought I'd change the subject as really we are discussing a kind of
> Virtual Topics where folks can use Queues to subscribe and consume
> from them etc...
>
>
> On 7/13/06, bmadigan <bm...@orbitz.com> wrote:
> >
> > This is almost working,
>
> YAY!
>
> > there are a few things I need to fix:
> > - Need to figure out how to add the new Broker to the factory without
> using
> > the plugin loader
>
> I was thinking, we should maybe add the Virtual Topic interceptor to
> the broker by default, but allow the configuration to be overridden in
> the activemq.xml as I can't help think it's be great to be able to use
> it out of the box.
>
> We've already got the "ActiveMQ.Advisory." prefix in topics to be used
> for advisories. So how about we use ActiveMQ.Virtual. as the prefix
> for the default prefix for virtual topics & we let folks customize
> this if they don't like the defaults?
>
> So we should maybe add a VirtualDestinationPlugin property on the
> BrokerService which is created by default (unless explicitly disabled
> using a bean property) which would auto-default to a sensible default
> that could be overloaded- or forks could set the virtual destinations
> to something else if they prefer etc.
>
>
> > - It may not be a problem, but I'm synchronizing  on next when I create
> the
> > queues for the virtual groups in addConsumer().  This could be finer
> grained
> > I think.
>
> Am not sure if you need to do that; I think the default mechanism of
> clients consuming on the queue with the destination auto-created on
> the fly would be fine?
>
>
> > - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
> > consumer to a virtual queue. This is probably incorrect, not sure if
> there
> > is a better way.
>
> Are you trying to transform a durable topic consumer into a queue
> consumer? Am not sure you need. Am thinking if you are a topic
> consumer (durable or non-durable) then we leave you as you are; a
> fully JMS compliant durable topic consumer. However to use the nice
> queue-centric virtual topics, you really use a queue consumer of the
> right name and things all just work. (BTW the topic regions are
> optimised so that if no consumers are available, publishing to a topic
> is a no-op)
>
>
> > - The virtual queues can't provide subscription recovery. Not sure how
> to
> > handle that.
>
> Yeah thats true. Am not too worried about that right now - but we
> could look at fixing that later on.
>
>
> > I created a BrokerFilter subclass which overrides addConsumer() and
> send():
> >
> >  public Subscription addConsumer(ConnectionContext cc,
> >                                     ConsumerInfo ci) throws Exception {
> >         synchronized(next){
> >             String name  = ci.getDestination().getPhysicalName();
> >             if(name.startsWith(VIRTUAL)){
> >                 Set destinations = getDestinations(
> >                         new ActiveMQQueue(name));
> >                 if(destinations.size()==0){//create a new virtual queue
> >                     ActiveMQQueue queue = new ActiveMQQueue(
> >                             name+"?consumer.exclusive=true");
> >                         next.addDestination(cc,queue);
> >                     ci.setDestination(queue);
> >                 }else{ //queue exists, add the consumer
> >                     ActiveMQQueue queue = (ActiveMQQueue)
> >                             destinations.iterator().next();
> >                     ci.setDestination(queue);
> >                 }
> >             }
> >         }
> >         return next.addConsumer(cc, ci);
> >     }
> >
> >     public void send(ConnectionContext ctx,
> >                      Message message) throws Exception {
> >         String topic = message.getDestination().getPhysicalName();
> >         Iterator destinations = getDestinations(
> >                 new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
> >         while(destinations.hasNext()){
> >             Destination dest = (Destination) destinations.next();
> >             dest.send(ctx, message);
> >         }
> >         next.send(ctx, message);
> >     }
> >
> > Except for the subscription recovery part, this seems to work.
>
> Looks great - though am thinking we only really need the send() part -
> as the other stuff like adding consumers should just work out side the
> box (unless I'm missing something).
>
> BTW do you want to submit a patch, then we can start wiring this stuff
> in? Great work though - am excited to see this stuff implemented! :)
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>