You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/15 18:19:02 UTC
svn commit: r538240 - in /incubator/qpid/branches/M2/java:
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/test/java/org/apache/qpid/server/exchange/
systests/src/main/java/org/apache/qpid/server/exchange/
Author: ritchiem
Date: Tue May 15 09:19:01 2007
New Revision: 538240
URL: http://svn.apache.org/viewvc?view=rev&rev=538240
Log:
QPID-3 Topic Matching with tests
A simple naive approach. Similar to C++ to be included for M2.
More elaborate pre-evaluated version will have to wait.
Once benchmarks have been performed we can evaluate performance advantages if any of that approach.
Added:
incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (with props)
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=538240&r1=538239&r2=538240
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Tue May 15 09:19:01 2007
@@ -23,6 +23,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -56,6 +58,10 @@
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+ private static final String TOPIC_SEPARATOR = ".";
+ private static final String AMQP_STAR = "*";
+ private static final String AMQP_HASH = "#";
/** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
@MBeanDescription("Management Bean for Topic Exchange")
@@ -78,7 +84,7 @@
AMQShortString key = entry.getKey();
List<String> queueList = new ArrayList<String>();
- List<AMQQueue> queues = entry.getValue();
+ List<AMQQueue> queues = getMatchedQueues(key);
for (AMQQueue q : queues)
{
queueList.add(q.getName().toString());
@@ -118,10 +124,13 @@
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
}
- public synchronized void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
- assert routingKey != null;
+ assert rKey != null;
+
+ AMQShortString routingKey = normalize(rKey);
+
_logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
@@ -142,15 +151,67 @@
}
+ private AMQShortString normalize(AMQShortString routingKey)
+ {
+ StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
+ List<String> _subscription = new ArrayList<String>();
+
+ while (routingTokens.hasMoreTokens())
+ {
+ _subscription.add(routingTokens.nextToken());
+ }
+
+ int size = _subscription.size();
+
+ for (int index = 0; index < size; index++)
+ {
+ //if there are more levels
+ if (index + 1 < size)
+ {
+ if (_subscription.get(index).equals(AMQP_HASH))
+ {
+ if (_subscription.get(index + 1).equals(AMQP_HASH))
+ {
+ // we don't need #.# delete this one
+ _subscription.remove(index);
+ size--;
+ //redo this normalisation
+ index--;
+ }
+
+ if (_subscription.get(index + 1).equals(AMQP_STAR))
+ {
+ // we don't want #.* swap to *.#
+ // remove it and put it in at index + 1
+ _subscription.add(index + 1, _subscription.remove(index));
+ }
+ }
+ }//if we have more levels
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ for (String s : _subscription)
+ {
+ sb.append(s);
+ sb.append(TOPIC_SEPARATOR);
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+
+ return new AMQShortString(sb.toString());
+ }
+
public void route(AMQMessage payload) throws AMQException
{
MessagePublishInfo info = payload.getMessagePublishInfo();
- final AMQShortString routingKey = info.getRoutingKey();
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ final AMQShortString routingKey = normalize(info.getRoutingKey());
+
+ List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
- if (queues == null)
+ if (queues == null || queues.size() == 0)
{
if (info.isMandatory())
{
@@ -177,14 +238,14 @@
public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
{
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
return queues != null && queues.contains(queue);
}
public boolean isBound(AMQShortString routingKey) throws AMQException
{
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
return queues != null && !queues.isEmpty();
}
@@ -205,10 +266,12 @@
return !_routingKey2queues.isEmpty();
}
- public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
- assert routingKey != null;
+ assert rKey != null;
+
+ AMQShortString routingKey = normalize(rKey);
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
if (queues == null)
@@ -240,5 +303,111 @@
_logger.error("Exception occured in creating the topic exchenge mbean", ex);
throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
}
+ }
+
+
+ private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
+ {
+ List<AMQQueue> list = new LinkedList<AMQQueue>();
+ StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
+
+ ArrayList<String> routingkeyList = new ArrayList<String>();
+
+ while (routingTokens.hasMoreTokens())
+ {
+ String next = routingTokens.nextToken();
+ if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH))
+ {
+ continue;
+ }
+
+ routingkeyList.add(next);
+ }
+
+ for (AMQShortString queue : _routingKey2queues.keySet())
+ {
+ StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR);
+
+ ArrayList<String> queueList = new ArrayList<String>();
+
+ while (queTok.hasMoreTokens())
+ {
+ queueList.add(queTok.nextToken());
+ }
+
+
+ int depth = 0;
+ boolean matching = true;
+ boolean done = false;
+ int routingskip = 0;
+ int queueskip = 0;
+
+ while (matching && !done)
+ {
+ if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+ {
+ done = true;
+
+ // if it was the routing key that ran out of digits
+ if (routingkeyList.size() == depth + routingskip)
+ {
+ if (queueList.size() > (depth + queueskip))
+ { // a hash and it is the last entry
+ matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+ }
+ }
+ else if (routingkeyList.size() > depth + routingskip)
+ {
+ // There is still more routing key to check
+ matching = false;
+ }
+
+
+ continue;
+ }
+
+ // if the values on the two topics don't match
+ if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip)))
+ {
+ if (queueList.get(depth + queueskip).equals(AMQP_STAR))
+ {
+ depth++;
+
+ continue;
+ }
+ else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
+ {
+ // Is this a # at the end
+ if (queueList.size() == depth + queueskip + 1)
+ {
+ done = true;
+ continue;
+ }
+
+ // otherwise # in the middle
+ while (routingkeyList.size() > depth + routingskip)
+ {
+ if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
+ {
+ queueskip++;
+ depth++;
+ break;
+ }
+ routingskip++;
+ }
+ continue;
+ }
+ matching = false;
+ }
+ depth++;
+ }
+
+ if (matching)
+ {
+ list.addAll(_routingKey2queues.get(queue));
+ }
+ }
+
+ return list;
}
}
Added: incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?view=auto&rev=538240
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (added)
+++ incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Tue May 15 09:19:01 2007
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedList;
+
+public class DestWildExchangeTest extends TestCase
+{
+
+ DestWildExchange _exchange;
+
+ VirtualHost _vhost;
+ MessageStore _store;
+ StoreContext _context;
+
+
+ public void setUp() throws AMQException
+ {
+ _exchange = new DestWildExchange();
+ _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+ _store = new MemoryMessageStore();
+ _context = new StoreContext();
+ }
+
+
+ public void testNoRoute() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a*#b"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+
+
+ MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
+
+ AMQMessage message = new AMQMessage(0L, info, null);
+
+ try
+ {
+ _exchange.route(message);
+ fail("Message has no route and shouldn't be routed");
+ }
+ catch (NoRouteException nre)
+ {
+ //normal
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+ }
+
+ public void testDirectMatch() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("ab"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has no route and should fail to be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+ }
+
+
+ public void testStarMatch() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a*"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has no route and should fail to be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+ }
+
+ public void testHashMatch() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.#"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has no route and should fail to be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+ }
+
+
+ public void testMidHash() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+
+
+ AMQMessage message = createMessage("a.c.d.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a.c.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testMatchafterHash() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null);
+
+
+ AMQMessage message = createMessage("a.c.b.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.a.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a.b.c.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a.b.c.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+
+ public void testHashAfterHash() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null);
+
+
+ AMQMessage message = createMessage("a.c.b.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.a.b.c.d");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testHashHash() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null);
+
+
+ AMQMessage message = createMessage("a.c.b.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a.a.b.c.d");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testSubMatchFails() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testMoreRouting() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testMoreQueue() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+ AMQMessage message = createMessage("a");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ private AMQMessage createMessage(String s) throws AMQException
+ {
+ MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
+
+ TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+
+ AMQMessage message = new AMQMessage(0L, info, trancontext);
+ message.setContentHeaderBody(new ContentHeaderBody());
+
+ return message;
+ }
+
+
+ class PublishInfo implements MessagePublishInfo
+ {
+ AMQShortString _routingkey;
+
+ PublishInfo(AMQShortString routingkey)
+ {
+ _routingkey = routingkey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingkey;
+ }
+ }
+}
Propchange: incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=538240&r1=538239&r2=538240
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Tue May 15 09:19:01 2007
@@ -251,7 +251,7 @@
;
}
- assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+ assertEquals("Wrong number of messages bounced: ", 1, _bouncedMessageList.size());
Message m = _bouncedMessageList.get(0);
assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));