You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/23 17:54:15 UTC

svn commit: r531513 - in /incubator/qpid/branches/M2/java: broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java

Author: ritchiem
Date: Mon Apr 23 08:54:15 2007
New Revision: 531513

URL: http://svn.apache.org/viewvc?view=rev&rev=531513
Log:
QPID-436 - topic exchange doesn't obey the mandatory flag

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=531513&r1=531512&r2=531513
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Mon Apr 23 08:54:15 2007
@@ -57,19 +57,16 @@
 
     private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
 
-    /**
-     *  DestWildExchangeMBean class implements the management interface for the
-     *  Topic exchanges.
-     */
+    /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
     @MBeanDescription("Management Bean for Topic Exchange")
     private final class DestWildExchangeMBean extends ExchangeMBean
     {
         // open mbean data types for representing exchange bindings
-        private String[]   _bindingItemNames = {"Routing Key", "Queue Names"};
-        private String[]   _bindingItemIndexNames = {_bindingItemNames[0]};
+        private String[] _bindingItemNames = {"Routing Key", "Queue Names"};
+        private String[] _bindingItemIndexNames = {_bindingItemNames[0]};
         private OpenType[] _bindingItemTypes = new OpenType[2];
-        private CompositeType      _bindingDataType = null;
-        private TabularType        _bindinglistDataType = null;
+        private CompositeType _bindingDataType = null;
+        private TabularType _bindinglistDataType = null;
         private TabularDataSupport _bindingList = null;
 
         @MBeanConstructor("Creates an MBean for AMQ topic exchange")
@@ -80,22 +77,18 @@
             init();
         }
 
-        /**
-         * initialises the OpenType objects.
-         */
+        /** initialises the OpenType objects. */
         private void init() throws OpenDataException
         {
             _bindingItemTypes[0] = SimpleType.STRING;
             _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
             _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names",
-                                         _bindingItemNames, _bindingItemNames, _bindingItemTypes);
+                                                 _bindingItemNames, _bindingItemNames, _bindingItemTypes);
             _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(),
-                                         _bindingDataType, _bindingItemIndexNames);
+                                                   _bindingDataType, _bindingItemIndexNames);
         }
 
-        /**
-         * returns exchange bindings in tabular form
-         */
+        /** returns exchange bindings in tabular form */
         public TabularData bindings() throws OpenDataException
         {
             _bindingList = new TabularDataSupport(_bindinglistDataType);
@@ -122,7 +115,9 @@
         {
             AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
             if (queue == null)
+            {
                 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+            }
 
             try
             {
@@ -159,7 +154,7 @@
         {
             queueList.add(queue);
         }
-        else if(_logger.isDebugEnabled())
+        else if (_logger.isDebugEnabled())
         {
             _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
         }
@@ -176,10 +171,18 @@
         // TODO: add support for the immediate flag
         if (queues == null)
         {
-            _logger.warn("No queues found for routing key " + routingKey);
-            _logger.warn("Routing map contains: " + _routingKey2queues);
-            //todo Check for valid topic - mritchie
-            return;
+            if (info.isMandatory())
+            {
+                String msg = "Topic " + routingKey + " is not known to " + this;
+                throw new NoRouteException(msg, payload);
+            }
+            else
+            {
+                _logger.warn("No queues found for routing key " + routingKey);
+                _logger.warn("Routing map contains: " + _routingKey2queues);
+                //todo Check for valid topic - mritchie
+                return;
+            }
         }
 
         for (AMQQueue q : queues)
@@ -245,7 +248,7 @@
         }
     }
 
-    protected ExchangeMBean createMBean()  throws AMQException
+    protected ExchangeMBean createMBean() throws AMQException
     {
         try
         {

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=531513&r1=531512&r2=531513
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Mon Apr 23 08:54:15 2007
@@ -22,6 +22,8 @@
     private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
 
     private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
+    private static final String VIRTUALHOST = "test";
+    private static final String BROKER = "vm://:1";
 
     static
     {
@@ -53,10 +55,10 @@
      *
      * @throws Exception
      */
-    public void testReturnUnroutableMandatoryMessage() throws Exception
+    public void testReturnUnroutableMandatoryMessage_HEADERS() throws Exception
     {
         _bouncedMessageList.clear();
-        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
 
 
         AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -70,7 +72,7 @@
         //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
         // This is the default now
 
-        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
 
         con2.setExceptionListener(this);
         AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -126,6 +128,138 @@
 
 
     }
+
+    public void testReturnUnroutableMandatoryMessage_QUEUE() throws Exception
+    {
+        _bouncedMessageList.clear();
+        Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
+
+
+        AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        AMQQueue valid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE");
+        AMQQueue invalid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE_INVALID");
+        MessageConsumer consumer = consumerSession.createConsumer(valid_queue);
+
+        //force synch to ensure the consumer has resulted in a bound queue
+        //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+        // This is the default now
+
+        Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
+
+        con2.setExceptionListener(this);
+        AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Need to start the "producer" connection in order to receive bounced messages
+        _logger.info("Starting producer connection");
+        con2.start();
+
+
+        MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_queue, false, false);
+        MessageProducer mandatoryProducer = producerSession.createProducer(invalid_queue);
+
+        // First test - should be routed
+        _logger.info("Sending non-mandatory message");
+        TextMessage msg1 = producerSession.createTextMessage("msg1");
+        nonMandatoryProducer.send(msg1);
+
+        // Second test - should be bounced
+        _logger.info("Sending non-routable mandatory message");
+        TextMessage msg2 = producerSession.createTextMessage("msg2");
+        mandatoryProducer.send(msg2);
+
+
+        _logger.info("Starting consumer connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+        assertTrue("No message routed to receiver", tm != null);
+        assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+        try
+        {
+            Thread.sleep(1000L);
+        }
+        catch (InterruptedException e)
+        {
+            ;
+        }
+
+        assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+        Message m = _bouncedMessageList.get(0);
+        assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+
+
+        con.close();
+        con2.close();
+    }
+
+
+    public void testReturnUnroutableMandatoryMessage_TOPIC() throws Exception
+    {
+        _bouncedMessageList.clear();
+        Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
+
+
+        AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        AMQTopic valid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, "test.Return.Unroutable.Mandatory.Message.TOPIC");
+        AMQTopic invalid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, "test.Return.Unroutable.Mandatory.Message.TOPIC.invalid");
+        MessageConsumer consumer = consumerSession.createConsumer(valid_topic);
+
+        //force synch to ensure the consumer has resulted in a bound queue
+        //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+        // This is the default now
+
+        Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
+
+        con2.setExceptionListener(this);
+        AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Need to start the "producer" connection in order to receive bounced messages
+        _logger.info("Starting producer connection");
+        con2.start();
+
+
+        MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_topic, false, false);
+        MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic);
+
+        // First test - should be routed
+        _logger.info("Sending non-mandatory message");
+        TextMessage msg1 = producerSession.createTextMessage("msg1");
+        nonMandatoryProducer.send(msg1);
+
+        // Second test - should be bounced
+        _logger.info("Sending non-routable mandatory message");
+        TextMessage msg2 = producerSession.createTextMessage("msg2");
+        mandatoryProducer.send(msg2);
+
+
+        _logger.info("Starting consumer connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+        assertTrue("No message routed to receiver", tm != null);
+        assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+        try
+        {
+            Thread.sleep(1000L);
+        }
+        catch (InterruptedException e)
+        {
+            ;
+        }
+
+        assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+        Message m = _bouncedMessageList.get(0);
+        assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+
+
+        con.close();
+        con2.close();
+    }
+
 
     public static junit.framework.Test suite()
     {