You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Gordon Sim <gs...@redhat.com> on 2012/01/03 12:28:34 UTC

Re: Messaging scenarios

On 12/29/2011 07:00 AM, Kalle Kärkkäinen wrote:
> Hi,
>
> I'm developing a system using QPID. I'm considering the options for
> different queues, and would like some opinions and help with the issues
> I have ran in to.
>
> At the moment I'm using the C++ broker with Java clients.
>
> 1) Configuration queue
>
> I have a configuration message, and I need a queue to store it. Last
> Value Queue seems to be the ticket on this, especially if it would be
> durable. The queue get set with qpid-config with parameters
>
> --order=lvq --durable --limit-policy=ring --max-queue-count=1
>
> and it seems that after QPID restart, there are no messages there. Am I
> missing something?

As Bruno pointed out, you need to ensure the messages are also marked 
durable.

Also, in passing, it seems redundant to have a ring queue of 1 message 
and set the LVQ option. If there is only ever one message on the queue 
the LVQ matching will never be needed (each message will simply replace 
the existing one, regardless of the key).

If you do want to use LVQ I would also recommend using 
qpid.last_value_queue_key to set the property to key on. That will also 
configure more intuitive behaviour in place of the old (now legacy) LVQ 
implementation. E.g. the qpid-config parameters might be: --argument 
qpid.last_value_queue_key=my-property.

> 2) one-to-many messaging
>
> Earlier (years back, m4 time) I asked the same thing, and I suspect the
> same answer: The way to achieve one-to-many messaging is pubsub setup
> with private temporary queues with routing rules to move messages from
> pub-queue to said sub-queues. Is this still the case?

There is no pub-queue, there is (in AMQP 0-10) an 'exchange' to which 
the temporary queues are all bound. Publishers send message to this 
exchange and all queues with a matching binding receive a copy of it.

If you are using JMS, this corresponds to a Topic type Destination. 
There are various ways to configure that, but I think the simplest is to 
create an exchange for each logical topic and simply use the exchange 
name to define/create the Destination. That way everything is taken care 
of by the JMS library.

> The real world case here is that we are syncing servers with messages,
> sending them to federated QPIDs and processing the data only after it
> has been through the federation (passed to all QPID nodes). Is this the
> way to go? Corosync setup would be difficult because of the broadcast
> requirement, QPID nodes are not in same network.
>
> 3) many-to-many messaging
>
> There are times when we have multiple pubs in the pubsub scheme. How do
> we do this? Would it be more routing inside the QPID, or what?

Really this boils down to the same case as one-to-many. You have a Topic 
and you can have one or more MessageProducers (in JMS) created for it.

The one special aspect you may want to consider is total ordering. I.e. 
do you need all subscribers to see the messages in the exact same order?

In the case of a single producer this will always be the case as the 
messages are causally ordered anyway. However with multiple concurrent 
producers the interleaving of messages from different producers may be 
different on different queues. To work around this you can make use of 
an extended option on the c++ broker that synchronises all publications 
to ensure each subscriber sees the exact same sequence. This of course 
does imply some penalty in throughput due to reduced concurrency.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Messaging scenarios

Posted by Kalle <ka...@airdice.com>.
Hi,

It seems that what ever magic I was missing was bundled into using straight up JMS API.

What they did in here:

http://www.novell.com/documentation/extend52/Docs/help/MP/jms/tutorial/durable-1.htm

helped me along. Thanks all! 

-- 
Kalle


On Tuesday 3. January 2012 at 14.31, Kalle wrote:

> forwarding wrongly targeted message to users list. 
> 
> -- 
> Kalle
> 
> 
> Forwarded message:
> 
> > From: Kalle <kalle@airdice.com (mailto:kalle@airdice.com)>
> > To: Gordon Sim <gsim@redhat.com (mailto:gsim@redhat.com)>
> > Date: Tuesday 3. January 2012 14.26.38
> > Subject: Re: Messaging scenarios
> > 
> > 
> > Thanks for all the answers so far!
> > 
> > Here is some of my code (edited for clarity, sorry that it will not compile though). The situation has evolved, I now try to achieve the queue definitions is application code.
> > 
> > 
> > String qpid = "amqp...";
> > AMQConnectionFactory connectionFactory = new AMQConnectionFactory(qpid);
> > AMQConnection connection = (AMQConnection) connectionFactory.createConnection();
> > connection.start();
> > AMQSession_0_10 session = connection.createSession(TRANSACTIONS,Session.CLIENT_ACKNOWLEDGE, 1));
> > 
> > AMQBindingURL url = new AMQBindingURL("topic://configuration/server1?" + 
> > exclusive='false'&qpid.last_value_queue='true'&qpid.last_value_queue_key='configuration'");
> > 
> > Topic config = new AMQTopic(url); 
> > 
> > MessageConsumer configconsumer = this.session.createSubscriber(config); 
> > 
> > Message configMessage = configconsumer.receive(1000L); 
> > 
> > if (configMessage == null) { 
> > LOGGER.info (http://LOGGER.info)("Configuration was not found in QPID, seeding default");
> > 
> > 
> > .. create configuration object ..
> > 
> > 
> > BytesMessage m = this.session.createBytesMessage(); 
> > m.setStringProperty("configuration", "server1");
> > m.writeBytes(configuration.toByteArray());
> > MessageProducer configproducer = this.session.createPublisher(config);
> > configproducer.send(m, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 0L);
> > Thread.sleep(100L);
> > configproducer.close();
> > }
> > 
> > 
> > So I'm trying to use the AMQP API instead of JMS (although it seems an unholy combination of both in the end).
> > 
> > 
> > At the moment this does not work; if I run it twice, both applications log 'configuration not found'.
> > 
> > 
> > On Tuesday 3. January 2012 at 13.28, Gordon Sim wrote:
> > > As Bruno pointed out, you need to ensure the messages are also marked
> > > durable.
> > > 
> > 
> > 
> > I assume I can achieve this by sending it with persistent delivery mode? 
> > > Also, in passing, it seems redundant to have a ring queue of 1 message
> > > and set the LVQ option. If there is only ever one message on the queue 
> > > the LVQ matching will never be needed (each message will simply replace 
> > > the existing one, regardless of the key).
> > > 
> > 
> > 
> > Agreed. I think in the future I would have multiple different configuration objects, thus not needing either ring or size definitions. The idea is to have an exchange:
> > 
> > topic://configurations/
> > 
> > with multiple underlying topics
> > 
> > topic://configuration/server1
> > topic://configuration/server2/service1
> > 
> > et cetera. In this scenario LVQ may make more sense?
> > > 
> > > There is no pub-queue, there is (in AMQP 0-10) an 'exchange' to which 
> > > the temporary queues are all bound. Publishers send message to this 
> > > exchange and all queues with a matching binding receive a copy of it.
> > > 
> > 
> > 
> > In BindingURL this would mean a binding parameter with the topic name? 
> > 
> > 
> > 
> > Thanks!
> > 
> > --
> > Kalle.
> > 
> 
> 
> 



Fw: Messaging scenarios

Posted by Kalle <ka...@airdice.com>.
forwarding wrongly targeted message to users list. 

-- 
Kalle


Forwarded message:

> From: Kalle <ka...@airdice.com>
> To: Gordon Sim <gs...@redhat.com>
> Date: Tuesday 3. January 2012 14.26.38
> Subject: Re: Messaging scenarios
> 
> 
> Thanks for all the answers so far!
> 
> Here is some of my code (edited for clarity, sorry that it will not compile though). The situation has evolved, I now try to achieve the queue definitions is application code.
> 
> 
> String qpid = "amqp...";
> AMQConnectionFactory connectionFactory = new AMQConnectionFactory(qpid);
> AMQConnection connection = (AMQConnection) connectionFactory.createConnection();
> connection.start();
> AMQSession_0_10 session = connection.createSession(TRANSACTIONS,Session.CLIENT_ACKNOWLEDGE, 1));
> 
> AMQBindingURL url = new AMQBindingURL("topic://configuration/server1?" + 
> exclusive='false'&qpid.last_value_queue='true'&qpid.last_value_queue_key='configuration'");
> 
> Topic config = new AMQTopic(url); 
> 
> MessageConsumer configconsumer = this.session.createSubscriber(config); 
> 
> Message configMessage = configconsumer.receive(1000L); 
> 
> if (configMessage == null) { 
> LOGGER.info("Configuration was not found in QPID, seeding default");
> 
> 
> .. create configuration object ..
> 
> 
> BytesMessage m = this.session.createBytesMessage(); 
> m.setStringProperty("configuration", "server1");
> m.writeBytes(configuration.toByteArray());
> MessageProducer configproducer = this.session.createPublisher(config);
> configproducer.send(m, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 0L);
> Thread.sleep(100L);
> configproducer.close();
> }
> 
> 
> So I'm trying to use the AMQP API instead of JMS (although it seems an unholy combination of both in the end).
> 
> 
> At the moment this does not work; if I run it twice, both applications log 'configuration not found'.
> 
> 
> On Tuesday 3. January 2012 at 13.28, Gordon Sim wrote:
> > As Bruno pointed out, you need to ensure the messages are also marked
> > durable.
> > 
> > 
> > 
> 
> I assume I can achieve this by sending it with persistent delivery mode? 
> > Also, in passing, it seems redundant to have a ring queue of 1 message
> > and set the LVQ option. If there is only ever one message on the queue 
> > the LVQ matching will never be needed (each message will simply replace 
> > the existing one, regardless of the key).
> > 
> > 
> > 
> 
> Agreed. I think in the future I would have multiple different configuration objects, thus not needing either ring or size definitions. The idea is to have an exchange:
> 
> topic://configurations/
> 
> with multiple underlying topics
> 
> topic://configuration/server1
> topic://configuration/server2/service1
> 
> et cetera. In this scenario LVQ may make more sense?
> > 
> > There is no pub-queue, there is (in AMQP 0-10) an 'exchange' to which 
> > the temporary queues are all bound. Publishers send message to this 
> > exchange and all queues with a matching binding receive a copy of it.
> > 
> > 
> > 
> 
> In BindingURL this would mean a binding parameter with the topic name? 
> 
> 
> 
> Thanks!
> 
> --
> Kalle.
>