You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Uday77 <ud...@gmail.com> on 2011/07/14 02:30:22 UTC

Unable to connect multiple consumers to a queue on fan out exchange

Hi All,

I have a fanout exchange and I am binding different queues to this exchange.
When I send data to this exchange each queue that is bound to this exchange
is getting data as expected. But when I try to connect multiple consumers to
one of these fanned out queues, I get the following message:
"Queue message_queue1 has an exclusive consumer. No more consumers allowed."

Here is the properties file that has the configuration:

fanout.properties:
--------------------
java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionfactory =
amqp://guest:guest@xxxxxxx/?brokerlist='tcp://xxxxx:5672'

# for producer
destination.fanoutQueue =
BURL:fanout://testcollector.fanout//message_queue?durable='false'&autodelete='true'&exclusive='false'

# for consumers
destination.fanoutQueue1 =
BURL:fanout://testcollector.fanout//message_queue1?durable='false'&autodelete='true'&exclusive='false'

destination.fanoutQueue2 =
BURL:fanout://testcollector.fanout//message_queue2?durable='false'&autodelete='true'&exclusive='false'

destination.fanoutQueue3 =
BURL:fanout://testcollector.fanout//message_queue3?durable='false'&autodelete='true'&exclusive='false'
-------------------------

I tried to connect two consumers to message_queue1 and I get the following
error:
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to
:org.apache.qpid.AMQException: ch=0 id=0
ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4,
commandCode=7, fieldIndex=0, description=resource-locked: Queue
message_queue1 has an exclusive consumer. No more consumers allowed.
(qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]

I checked the configuration on AMQP Server and it says that message_queue1
is a non-exclusive queue.
-bash-4.1$ qpid-stat -q
Queues
  queue                                  dur  autoDel  excl  msg   msgIn 
msgOut  bytes  bytesIn  bytesOut  cons  bind
  message_queue1                              Y                 1    22    
21      15    324      309         1     2


Here is the code I am using.

Producer.java: (queueName=fanoutQueue)
----------------
                Properties properties = new Properties();
		properties.load(this.getClass().getResourceAsStream("fanout.properties"));

		//Create the initial context
		Context ctx = new InitialContext(properties);

		// look up destination and connection factory
		Destination destination = (Destination)ctx.lookup(queueName);
		ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
		
		Connection connection = conFac.createConnection();
		Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
		
		MessageProducer messageProducer = session.createProducer(destination);
		TextMessage message;

		// Send a series of messages in a loop
		int i=0;
		while(true) {
			message = session.createTextMessage("Hello world! "+i);
		    messageProducer.send(message, DeliveryMode.NON_PERSISTENT,   
Message.DEFAULT_PRIORITY, 60*1000);
			i++;
			Thread.sleep(1000);
		}
-------------------

Consumer.java: (queueName=fanoutQueue1)
-----------------
                Properties properties = new Properties();
		properties.load(this.getClass().getResourceAsStream("fanout.properties"));

		//Create the initial context
		Context ctx = new InitialContext(properties);

		// look up destination and connection factory
		Destination destination = (Destination)ctx.lookup(queueName);
		ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
		
		Connection connection = conFac.createConnection();
		connection.setExceptionListener(new ExceptionListener()
		{
		    public void onException(JMSException jmse)
		    {
		        System.err.println(CLASS + ": The sample received an exception
through the ExceptionListener");
		        System.exit(0);
		    }
		});

		System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged
session");
		Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
		
		MessageConsumer messageConsumer = session.createConsumer(destination);
		connection.start();
		
		while(true) {
			TextMessage message = (TextMessage)messageConsumer.receive(1000);
			if(message != null)
				System.out.println(message.getText());
		}
-----------------

Am I missing something here?

I really appreciate your help.

Thanks,
Uday


--
View this message in context: http://apache-qpid-developers.2158895.n2.nabble.com/Unable-to-connect-multiple-consumers-to-a-queue-on-fan-out-exchange-tp6581419p6581419.html
Sent from the Apache Qpid developers mailing list archive at Nabble.com.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Unable to connect multiple consumers to a queue on fan out exchange

Posted by Gordon Sim <gs...@redhat.com>.
On 07/14/2011 01:30 AM, Uday77 wrote:
> I have a fanout exchange and I am binding different queues to this exchange.
> When I send data to this exchange each queue that is bound to this exchange
> is getting data as expected. But when I try to connect multiple consumers to
> one of these fanned out queues, I get the following message:
> "Queue message_queue1 has an exclusive consumer. No more consumers allowed."

[snip]

> I tried to connect two consumers to message_queue1 and I get the following
> error:
> INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to
> :org.apache.qpid.AMQException: ch=0 id=0
> ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4,
> commandCode=7, fieldIndex=0, description=resource-locked: Queue
> message_queue1 has an exclusive consumer. No more consumers allowed.
> (qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]
>
> I checked the configuration on AMQP Server and it says that message_queue1
> is a non-exclusive queue.

There are two levels of 'exclusivity' in AMQP, one is an exclusive queue 
the other is an exclusive consumer. As you point out this is not the 
former case, but the latter.

If you try qpid-stat -u you'll see the subscriptions marked as exclusive.

I'm not familiar enough with the format and handling of those JMS 
'binding urls' to comment on whether this is expected and whether there 
is another way to accomplish what you want. Anyone else able to help here?

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org