You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/15 18:19:02 UTC

svn commit: r538240 - in /incubator/qpid/branches/M2/java: broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/test/java/org/apache/qpid/server/exchange/ systests/src/main/java/org/apache/qpid/server/exchange/

Author: ritchiem
Date: Tue May 15 09:19:01 2007
New Revision: 538240

URL: http://svn.apache.org/viewvc?view=rev&rev=538240
Log:
QPID-3 Topic Matching with tests

A simple naive approach. Similar to C++ to be included for M2.

More elaborate pre-evaluated version will have to wait.
Once benchmarks have been performed we can evaluate performance advantages if any of that approach.

Added:
    incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java   (with props)
Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=538240&r1=538239&r2=538240
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Tue May 15 09:19:01 2007
@@ -23,6 +23,8 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.LinkedList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -56,6 +58,10 @@
     private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
 
     private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+    //    private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+    private static final String TOPIC_SEPARATOR = ".";
+    private static final String AMQP_STAR = "*";
+    private static final String AMQP_HASH = "#";
 
     /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
     @MBeanDescription("Management Bean for Topic Exchange")
@@ -78,7 +84,7 @@
                 AMQShortString key = entry.getKey();
                 List<String> queueList = new ArrayList<String>();
 
-                List<AMQQueue> queues = entry.getValue();
+                List<AMQQueue> queues = getMatchedQueues(key);
                 for (AMQQueue q : queues)
                 {
                     queueList.add(q.getName().toString());
@@ -118,10 +124,13 @@
         return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
     }
 
-    public synchronized void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         assert queue != null;
-        assert routingKey != null;
+        assert rKey != null;
+
+        AMQShortString routingKey = normalize(rKey);
+
         _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
         // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
         List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
@@ -142,15 +151,67 @@
 
     }
 
+    private AMQShortString normalize(AMQShortString routingKey)
+    {
+        StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
+        List<String> _subscription = new ArrayList<String>();
+
+        while (routingTokens.hasMoreTokens())
+        {
+            _subscription.add(routingTokens.nextToken());
+        }
+
+        int size = _subscription.size();
+
+        for (int index = 0; index < size; index++)
+        {
+            //if there are more levels
+            if (index + 1 < size)
+            {
+                if (_subscription.get(index).equals(AMQP_HASH))
+                {
+                    if (_subscription.get(index + 1).equals(AMQP_HASH))
+                    {
+                        // we don't need #.# delete this one
+                        _subscription.remove(index);
+                        size--;
+                        //redo this normalisation
+                        index--;
+                    }
+
+                    if (_subscription.get(index + 1).equals(AMQP_STAR))
+                    {
+                        // we don't want #.* swap to *.#
+                        // remove it and put it in at index + 1
+                        _subscription.add(index + 1, _subscription.remove(index));
+                    }
+                }
+            }//if we have more levels
+        }
+
+        StringBuilder sb = new StringBuilder();
+
+        for (String s : _subscription)
+        {
+            sb.append(s);
+            sb.append(TOPIC_SEPARATOR);
+        }
+
+        sb.deleteCharAt(sb.length() - 1);
+
+        return new AMQShortString(sb.toString());
+    }
+
     public void route(AMQMessage payload) throws AMQException
     {
         MessagePublishInfo info = payload.getMessagePublishInfo();
 
-        final AMQShortString routingKey = info.getRoutingKey();
-        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        final AMQShortString routingKey = normalize(info.getRoutingKey());
+
+        List<AMQQueue> queues = getMatchedQueues(routingKey);
         // if we have no registered queues we have nothing to do
         // TODO: add support for the immediate flag
-        if (queues == null)
+        if (queues == null || queues.size() == 0)
         {
             if (info.isMandatory())
             {
@@ -177,14 +238,14 @@
 
     public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
     {
-        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
         return queues != null && queues.contains(queue);
     }
 
 
     public boolean isBound(AMQShortString routingKey) throws AMQException
     {
-        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
         return queues != null && !queues.isEmpty();
     }
 
@@ -205,10 +266,12 @@
         return !_routingKey2queues.isEmpty();
     }
 
-    public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         assert queue != null;
-        assert routingKey != null;
+        assert rKey != null;
+
+        AMQShortString routingKey = normalize(rKey);
 
         List<AMQQueue> queues = _routingKey2queues.get(routingKey);
         if (queues == null)
@@ -240,5 +303,111 @@
             _logger.error("Exception occured in creating the topic exchenge mbean", ex);
             throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
         }
+    }
+
+
+    private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
+    {
+        List<AMQQueue> list = new LinkedList<AMQQueue>();
+        StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
+
+        ArrayList<String> routingkeyList = new ArrayList<String>();
+
+        while (routingTokens.hasMoreTokens())
+        {
+            String next = routingTokens.nextToken();
+            if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH))
+            {
+                continue;
+            }
+
+            routingkeyList.add(next);
+        }
+
+        for (AMQShortString queue : _routingKey2queues.keySet())
+        {
+            StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR);
+
+            ArrayList<String> queueList = new ArrayList<String>();
+
+            while (queTok.hasMoreTokens())
+            {
+                queueList.add(queTok.nextToken());
+            }
+
+
+            int depth = 0;
+            boolean matching = true;
+            boolean done = false;
+            int routingskip = 0;
+            int queueskip = 0;
+
+            while (matching && !done)
+            {
+                if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+                {
+                    done = true;
+
+                    // if it was the routing key that ran out of digits
+                    if (routingkeyList.size() == depth + routingskip)
+                    {
+                        if (queueList.size() > (depth + queueskip))
+                        {            // a hash and it is the last entry
+                            matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+                        }
+                    }
+                    else if (routingkeyList.size() > depth + routingskip)
+                    {
+                        // There is still more routing key to check
+                        matching = false;
+                    }
+
+
+                    continue;
+                }
+
+                // if the values on the two topics don't match
+                if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip)))
+                {
+                    if (queueList.get(depth + queueskip).equals(AMQP_STAR))
+                    {
+                        depth++;
+
+                        continue;
+                    }
+                    else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
+                    {
+                        // Is this a # at the end
+                        if (queueList.size() == depth + queueskip + 1)
+                        {
+                            done = true;
+                            continue;
+                        }
+
+                        // otherwise # in the middle
+                        while (routingkeyList.size() > depth + routingskip)
+                        {
+                            if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
+                            {
+                                queueskip++;
+                                depth++;
+                                break;
+                            }
+                            routingskip++;
+                        }
+                        continue;
+                    }
+                    matching = false;
+                }
+                depth++;
+            }
+
+            if (matching)
+            {
+                list.addAll(_routingKey2queues.get(queue));
+            }
+        }
+
+        return list;
     }
 }

Added: incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?view=auto&rev=538240
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (added)
+++ incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Tue May 15 09:19:01 2007
@@ -0,0 +1,607 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedList;
+
+public class DestWildExchangeTest extends TestCase
+{
+
+    DestWildExchange _exchange;
+
+    VirtualHost _vhost;
+    MessageStore _store;
+    StoreContext _context;
+
+
+    public void setUp() throws AMQException
+    {
+        _exchange = new DestWildExchange();
+        _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+        _store = new MemoryMessageStore();
+        _context = new StoreContext();
+    }
+
+
+    public void testNoRoute() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a*#b"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+
+
+        MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
+
+        AMQMessage message = new AMQMessage(0L, info, null);
+
+        try
+        {
+            _exchange.route(message);
+            fail("Message has no route and shouldn't be routed");
+        }
+        catch (NoRouteException nre)
+        {
+            //normal   
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+    }
+
+    public void testDirectMatch() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("ab"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+        AMQMessage message = createMessage("a.b");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has  route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+
+        message = createMessage("a.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has no route and should fail to be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+    }
+
+
+    public void testStarMatch() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a*"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.*"), queue, null);
+
+
+        AMQMessage message = createMessage("a.b");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has  route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+
+        message = createMessage("a.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+
+        message = createMessage("a");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has no route and should fail to be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+    }
+
+    public void testHashMatch() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.#"), queue, null);
+
+
+        AMQMessage message = createMessage("a.b.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+
+        message = createMessage("a.b");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+
+        message = createMessage("a.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+        message = createMessage("a");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+
+        message = createMessage("b");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has no route and should fail to be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+    }
+
+
+    public void testMidHash() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+
+
+        AMQMessage message = createMessage("a.c.d.b");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has no route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+        message = createMessage("a.c.b");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has no route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+    }
+
+    public void testMatchafterHash() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null);
+
+
+        AMQMessage message = createMessage("a.c.b.b");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has route and should not be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+
+
+        message = createMessage("a.a.b.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has no route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+        message = createMessage("a.b.c.b");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has  route and should not be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+
+        message = createMessage("a.b.c.b.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has no route and should be routed");
+
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+    }
+
+
+    public void testHashAfterHash() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null);
+
+
+        AMQMessage message = createMessage("a.c.b.b.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has route and should not be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+
+
+        message = createMessage("a.a.b.c.d");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has no route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+    }
+
+    public void testHashHash() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null);
+
+
+        AMQMessage message = createMessage("a.c.b.b.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has route and should not be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+
+        message = createMessage("a.a.b.c.d");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+        }
+        catch (AMQException nre)
+        {
+            fail("Message has no route and should be routed");
+        }
+
+        Assert.assertEquals(1, queue.getMessageCount());
+
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+        queue.deleteMessageFromTop(_context);
+        Assert.assertEquals(0, queue.getMessageCount());
+
+    }
+
+    public void testSubMatchFails() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null);
+
+
+        AMQMessage message = createMessage("a.b.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has route and should not be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+
+    }
+
+    public void testMoreRouting() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+        AMQMessage message = createMessage("a.b.c");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has route and should not be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+
+    }
+
+    public void testMoreQueue() throws AMQException
+    {
+        AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+        _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+        AMQMessage message = createMessage("a");
+
+        try
+        {
+            _exchange.route(message);
+            message.routingComplete(_store, _context, new MessageHandleFactory());
+            fail("Message has route and should not be routed");
+        }
+        catch (AMQException nre)
+        {
+        }
+
+        Assert.assertEquals(0, queue.getMessageCount());
+
+    }
+
+    private AMQMessage createMessage(String s) throws AMQException
+    {
+        MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
+
+        TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
+                                                                       new LinkedList<RequiredDeliveryException>(),
+                                                                       new HashSet<Long>());
+
+        AMQMessage message = new AMQMessage(0L, info, trancontext);
+        message.setContentHeaderBody(new ContentHeaderBody());
+
+        return message;
+    }
+
+
+    class PublishInfo implements MessagePublishInfo
+    {
+        AMQShortString _routingkey;
+
+        PublishInfo(AMQShortString routingkey)
+        {
+            _routingkey = routingkey;
+        }
+
+        public AMQShortString getExchange()
+        {
+            return null;
+        }
+
+        public boolean isImmediate()
+        {
+            return false;
+        }
+
+        public boolean isMandatory()
+        {
+            return true;
+        }
+
+        public AMQShortString getRoutingKey()
+        {
+            return _routingkey;
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=538240&r1=538239&r2=538240
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Tue May 15 09:19:01 2007
@@ -251,7 +251,7 @@
             ;
         }
 
-        assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+        assertEquals("Wrong number of messages bounced: ", 1, _bouncedMessageList.size());
         Message m = _bouncedMessageList.get(0);
         assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));