You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/11 17:22:05 UTC

svn commit: r655323 [1/4] - in /incubator/qpid/branches/broker-queue-refactor/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main...

Author: rgodfrey
Date: Sun May 11 08:22:03 2008
New Revision: 655323

URL: http://svn.apache.org/viewvc?rev=655323&view=rev
Log:
Updates on the refactoring work

Added:
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
      - copied, changed from r650226, incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
Removed:
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueImpl.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryAgent.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
Modified:
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/branches/broker-queue-refactor/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/Job.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
    incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java
    incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Sun May 11 08:22:03 2008
@@ -176,7 +176,8 @@
                 ownerShortString = new AMQShortString(owner);
             }
 
-            queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
+            queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost(),
+                                                       null);
             if (queue.isDurable() && !queue.isAutoDelete())
             {
                 _messageStore.createQueue(queue);

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Sun May 11 08:22:03 2008
@@ -44,6 +44,8 @@
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -435,10 +437,8 @@
             }
         }
 
-        synchronized (_unacknowledgedMessageMap.getLock())
-        {
-            _unacknowledgedMessageMap.add(deliveryTag, entry);
-        }
+        _unacknowledgedMessageMap.add(deliveryTag, entry);
+
     }
 
     private final String id = "(" + System.identityHashCode(this) + ")";
@@ -807,22 +807,7 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
     {
-        synchronized (_unacknowledgedMessageMap.getLock())
-        {
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
-            }
-
-            _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
-
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
-            }
-
-        }
-
+        _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
     }
 
     /**
@@ -952,4 +937,33 @@
     {
         return _messageStore;
     }
+
+    private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
+        {
+
+            public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+                    throws AMQException
+            {
+               getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+            }
+        };
+
+    public ClientDeliveryMethod getClientDeliveryMethod()
+    {
+        return _clientDeliveryMethod;
+    }
+
+    private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
+        {
+
+            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            {
+                addUnacknowledgedMessage(entry, deliveryTag, sub);
+            }
+        };
+
+    public RecordDeliveryMethod getRecordDeliveryMethod()
+    {
+        return _recordDeliveryMethod;
+    }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Sun May 11 08:22:03 2008
@@ -45,8 +45,6 @@
 
     void visit(Visitor visitor) throws AMQException;
 
-    Object getLock();
-
     void add(long deliveryTag, QueueEntry message);
 
     void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Sun May 11 08:22:03 2008
@@ -113,11 +113,6 @@
         }
     }
 
-    public Object getLock()
-    {
-        return _lock;
-    }
-
     public void add(long deliveryTag, QueueEntry message)
     {
         synchronized (_lock)

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Sun May 11 08:22:03 2008
@@ -30,6 +30,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -177,11 +178,22 @@
                 boolean durable = queueConfiguration.getBoolean("durable" ,false);
                 boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
                 String owner = queueConfiguration.getString("owner", null);
+                FieldTable arguments = null;
+                Integer priorities = queueConfiguration.getInteger("priorities", null);
+                if(priorities != null && priorities.intValue() > 1)
+                {
+                    if(arguments == null)
+                    {
+                        arguments = new FieldTable();
+                    }
+                    arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
+                }
+
 
                 queue = AMQQueueFactory.createAMQQueueImpl(queueName,
                         durable,
                         owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
-                        autodelete /* Therefore autodelete makes no sence */, virtualHost);
+                        autodelete /* Therefore autodelete makes no sence */, virtualHost, arguments);
 
                 if (queue.isDurable())
                 {

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Sun May 11 08:22:03 2008
@@ -38,7 +38,6 @@
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueImpl;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -192,9 +191,7 @@
         {
             _exchangeMbean.unregister();
         }
-    }
-
-    abstract public Map<AMQShortString, List<AMQQueue>> getBindings();
+    }    
 
     public String toString()
     {

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Sun May 11 08:22:03 2008
@@ -93,6 +93,6 @@
      */
     boolean hasBindings();
 
-    Map<AMQShortString, List<AMQQueue>> getBindings();
+    
 
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Sun May 11 08:22:03 2008
@@ -31,7 +31,6 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQQueueImpl;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Sun May 11 08:22:03 2008
@@ -22,6 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
@@ -31,7 +32,12 @@
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.topic.TopicParser;
+import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -43,6 +49,9 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.ref.WeakReference;
 
 public class TopicExchange extends AbstractExchange
 {
@@ -80,22 +89,204 @@
 
     private static final Logger _logger = Logger.getLogger(TopicExchange.class);
 
+/*
     private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
             new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
     private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
             new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
     private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
             new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+*/
     // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
     private static final byte TOPIC_SEPARATOR = (byte)'.';
     private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
     private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
     private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
-    private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized =
-            new ConcurrentHashMap<AMQShortString, AMQShortString[]>();
+
     private static final byte HASH_BYTE = (byte)'#';
     private static final byte STAR_BYTE = (byte)'*';
 
+    private final TopicParser _parser = new TopicParser();
+
+    private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
+            new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
+
+    private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
+
+    private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
+
+    public static class Binding
+    {
+        private final AMQShortString _bindingKey;
+        private final AMQQueue _queue;
+
+        public Binding(AMQShortString bindingKey, AMQQueue queue)
+        {
+            _bindingKey = bindingKey;
+            _queue = queue;
+        }
+
+        public AMQShortString getBindingKey()
+        {
+            return _bindingKey;
+        }
+
+        public AMQQueue getQueue()
+        {
+            return _queue;
+        }
+
+        public int hashCode()
+        {
+            return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 + _queue.hashCode();
+        }
+
+        public boolean equals(Object o)
+        {
+            if(this == o)
+            {
+                return true;
+            }
+            if(o instanceof Binding)
+            {
+                Binding other = (Binding) o;
+                return (_queue == other._queue)
+                        && ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey));
+            }
+            return false;
+        }
+    }
+
+
+
+    private final class TopicExchangeResult implements TopicMatcherResult
+    {
+        private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
+        private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
+
+        public void addUnfilteredQueue(AMQQueue queue)
+        {
+            Integer instances = _unfilteredQueues.get(queue);
+            if(instances == null)
+            {
+                _unfilteredQueues.put(queue, 1);
+            }
+            else
+            {
+                _unfilteredQueues.put(queue, instances + 1);
+            }
+        }
+
+        public void removeUnfilteredQueue(AMQQueue queue)
+        {
+            Integer instances = _unfilteredQueues.get(queue);
+            if(instances == 1)
+            {
+                _unfilteredQueues.remove(queue);
+            }
+            else
+            {
+                _unfilteredQueues.put(queue,instances - 1);
+            }
+
+        }
+
+
+        public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+        {
+            Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+            if(filters == null)
+            {
+                filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
+                _filteredQueues.put(queue, filters);
+            }
+            Integer instances = filters.get(filter);
+            if(instances == null)
+            {
+                filters.put(filter,1);
+            }
+            else
+            {
+                filters.put(filter, instances + 1);
+            }
+
+        }
+
+        public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+        {
+            Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+            if(filters != null)
+            {
+                Integer instances = filters.get(filter);
+                if(instances == 1)
+                {
+                    filters.remove(filter);
+                    if(filters.isEmpty())
+                    {
+                        _filteredQueues.remove(queue);
+                    }
+                }
+                else if(instances != null)
+                {
+                    filters.put(filter, instances - 1);
+                }
+
+            }
+
+        }
+
+        public void replaceQueueFilter(AMQQueue queue,
+                                       MessageFilter<RuntimeException> oldFilter,
+                                       MessageFilter<RuntimeException> newFilter)
+        {
+            Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+            Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
+            Integer oldFilterInstances = filters.get(oldFilter);
+            if(oldFilterInstances == 1)
+            {
+                newFilters.remove(oldFilter);
+            }
+            else
+            {
+                newFilters.put(oldFilter, oldFilterInstances-1);
+            }
+            Integer newFilterInstances = filters.get(newFilter);
+            if(newFilterInstances == null)
+            {
+                newFilters.put(newFilter, 1);
+            }
+            else
+            {
+                newFilters.put(newFilter, newFilterInstances+1);
+            }
+            _filteredQueues.put(queue,newFilters);
+        }
+
+        public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues)
+        {
+            queues.addAll(_unfilteredQueues.keySet());
+            if(!_filteredQueues.isEmpty())
+            {
+                for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
+                {
+                    if(!queues.contains(entry.getKey()))
+                    {
+                        for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
+                        {
+                            if(filter.matches(msg))
+                            {
+                                queues.add(entry.getKey());
+                            }
+                        }
+                    }
+                }
+            }
+            return queues;
+        }
+
+    }
+
+
     /** TopicExchangeMBean class implements the management interface for the Topic exchanges. */
     @MBeanDescription("Management Bean for Topic Exchange")
     private final class TopicExchangeMBean extends ExchangeMBean
@@ -112,20 +303,24 @@
         public TabularData bindings() throws OpenDataException
         {
             _bindingList = new TabularDataSupport(_bindinglistDataType);
-            for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet())
+            Map<String, List<String>> bindingData = new HashMap<String, List<String>>();
+            for (Binding binding : _bindings.keySet())
             {
-                AMQShortString key = entry.getKey();
-                List<String> queueList = new ArrayList<String>();
-
-                List<AMQQueue> queues = getMatchedQueues(key);
-                for (AMQQueue q : queues)
+                String key = binding.getBindingKey().toString();
+                List<String> queueNames = bindingData.get(key);
+                if(queueNames == null)
                 {
-                    queueList.add(q.getName().toString());
+                    queueNames = new ArrayList<String>();
+                    bindingData.put(key, queueNames);
                 }
+                queueNames.add(binding.getQueue().getName().toString());
 
-                Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[queueList.size()])};
-                CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
-                _bindingList.put(bindingData);
+            }
+            for(Map.Entry<String, List<String>> entry : bindingData.entrySet())
+            {
+                Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]) };
+                CompositeData bindingCompositeData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
+                _bindingList.put(bindingCompositeData);
             }
 
             return _bindingList;
@@ -163,73 +358,106 @@
 
         _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
 
-        // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
-        List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
-
-
 
+        AMQShortString routingKey;
 
-
-
-
-        // if we got null back, no previous value was associated with the specified routing key hence
-        // we need to read back the new value just put into the map
-        if (queueList == null)
+        if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
         {
-            queueList = _bindingKey2queues.get(rKey);
+            routingKey = normalize(rKey);
+        }
+        else
+        {
+            routingKey = rKey;
         }
 
+        Binding binding = new Binding(rKey, queue);
 
-
-        if (!queueList.contains(queue))
+        if(_bindings.containsKey(binding))
         {
-            queueList.add(queue);
+            FieldTable oldArgs = _bindings.get(binding);
+            TopicExchangeResult result = _topicExchangeResults.get(routingKey);
 
-
-            if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+            if(argumentsContainSelector(args))
             {
-                AMQShortString routingKey = normalize(rKey);
-                List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
-
-                if(queueList2 == null)
+                if(argumentsContainSelector(oldArgs))
                 {
-                    queueList2 = _wildCardBindingKey2queues.get(routingKey);
-                    AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR);
-
-                    ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens());
-
-                    while (keyTok.hasMoreTokens())
-                    {
-                        keyTokList.add(keyTok.nextToken());
-                    }
-
-                    _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()]));
+                    result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args));
+                }
+                else
+                {
+                    result.addFilteredQueue(queue,createSelectorFilter(args));
+                    result.removeUnfilteredQueue(queue);
                 }
-                queueList2.add(queue);
-
             }
             else
             {
-                List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
-                if(queueList2 == null)
+                if(argumentsContainSelector(oldArgs))
+                {
+                    result.addUnfilteredQueue(queue);
+                    result.removeFilteredQueue(queue, createSelectorFilter(oldArgs));
+                }
+                else
                 {
-                    queueList2 = _simpleBindingKey2queues.get(rKey);
+                    // TODO - fix control flow
+                    return;
                 }
-                queueList2.add(queue);
+            }
+
+        }
+        else
+        {
 
+            TopicExchangeResult result = _topicExchangeResults.get(routingKey);
+            if(result == null)
+            {
+                result = new TopicExchangeResult();
+                if(argumentsContainSelector(args))
+                {
+                    result.addFilteredQueue(queue, createSelectorFilter(args));
+                }
+                else
+                {
+                    result.addUnfilteredQueue(queue);
+                }
+                _parser.addBinding(routingKey, result);    
+                _topicExchangeResults.put(routingKey,result);
             }
+            else                        
+            {
+                if(argumentsContainSelector(args))
+                {
+                    result.addFilteredQueue(queue, createSelectorFilter(args));
+                }
+                else
+                {
+                    result.addUnfilteredQueue(queue);
+                }
+            }
+            _bindings.put(binding, args);
+        }
 
 
+    }
 
+    private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
+            throws AMQException
+    {
 
-        }
-        else if (_logger.isDebugEnabled())
+        final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+        WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
+        JMSSelectorFilter selector = null;
+
+        if(selectorRef == null || (selector = selectorRef.get())==null)
         {
-            _logger.debug("Queue " + queue + " is already registered with routing key " + rKey);
+            selector = new JMSSelectorFilter<RuntimeException>(selectorString);
+            _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
         }
+        return selector;
+    }
 
-
-
+    private static boolean argumentsContainSelector(final FieldTable args)
+    {
+        return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0;
     }
 
     private AMQShortString normalize(AMQShortString routingKey)
@@ -279,16 +507,6 @@
 
 
         AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
-/*
-        StringBuilder sb = new StringBuilder();
-        for (AMQShortString s : subscriptionList)
-        {
-            sb.append(s);
-            sb.append(TOPIC_SEPARATOR);
-        }
-
-        sb.deleteCharAt(sb.length() - 1);
-*/
 
         return normalizedString;
     }
@@ -298,11 +516,11 @@
 
         final AMQShortString routingKey = payload.getRoutingKey();
 
-        List<AMQQueue> queues = getMatchedQueues(routingKey);
+        Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey);
 
         if(queues == null || queues.isEmpty())
         {
-            _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes - " + _bindingKey2queues);
+            _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
         }
 
         payload.enqueue(queues);
@@ -316,23 +534,29 @@
 
     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
-        List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
+        Binding binding = new Binding(routingKey, queue);
 
-        return (queues != null) && queues.contains(queue);
+        return _bindings.containsKey(binding);
     }
 
     public boolean isBound(AMQShortString routingKey)
     {
-        List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
+        for(Binding b : _bindings.keySet())
+        {
+            if(b.getBindingKey().equals(routingKey))
+            {
+                return true;
+            }
+        }
 
-        return (queues != null) && !queues.isEmpty();
+        return false;
     }
 
     public boolean isBound(AMQQueue queue)
     {
-        for (List<AMQQueue> queues : _bindingKey2queues.values())
+        for(Binding b : _bindings.keySet())
         {
-            if (queues.contains(queue))
+            if(b.getQueue().equals(queue))
             {
                 return true;
             }
@@ -343,7 +567,7 @@
 
     public boolean hasBindings()
     {
-        return !_bindingKey2queues.isEmpty();
+        return !_bindings.isEmpty();
     }
 
     public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
@@ -351,52 +575,27 @@
         assert queue != null;
         assert rKey != null;
 
-        List<AMQQueue> queues = _bindingKey2queues.get(rKey);
-        if (queues == null)
-        {
-            throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
-                                   + " with routing key " + rKey + ". No queue was registered with that _routing key");
+        Binding binding = new Binding(rKey, queue);
 
-        }
 
-        boolean removedQ = queues.remove(queue);
-        if (!removedQ)
+        if (!_bindings.containsKey(binding))
         {
-            throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
-                                   + " with routing key " + rKey);
+            throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue.getName() + " was not registered with exchange " + this.getName()
+                                   + " with routing key " + rKey + ".");
         }
 
-
-        if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+        FieldTable bindingArgs = _bindings.remove(binding);
+        AMQShortString bindingKey = normalize(rKey);
+        TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+        if(argumentsContainSelector(bindingArgs))
         {
-            AMQShortString bindingKey = normalize(rKey);
-            List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey);
-            queues2.remove(queue);
-            if(queues2.isEmpty())
-            {
-                _wildCardBindingKey2queues.remove(bindingKey);
-                _bindingKey2Tokenized.remove(bindingKey);
-            }
-
+            result.removeFilteredQueue(queue, createSelectorFilter(bindingArgs));
         }
         else
         {
-            List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey);
-            queues2.remove(queue);
-            if(queues2.isEmpty())
-            {
-                _simpleBindingKey2queues.remove(rKey);
-            }
-
+            result.removeUnfilteredQueue(queue);
         }
 
-
-
-
-        if (queues.isEmpty())
-        {
-            _bindingKey2queues.remove(rKey);
-        }
     }
 
     protected ExchangeMBean createMBean() throws AMQException
@@ -412,172 +611,25 @@
         }
     }
 
-    public Map<AMQShortString, List<AMQQueue>> getBindings()
+    private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
     {
-        return _bindingKey2queues;
-    }
-
-    private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
-    {
-
-        List<AMQQueue> list = null;
 
-        if(!_wildCardBindingKey2queues.isEmpty())
+        Collection<TopicMatcherResult> results = _parser.parse(routingKey);
+        if(results.isEmpty())
         {
-
-
-            AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
-
-            final int routingTokensCount = routingTokens.countTokens();
-
-
-            AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount];
-
-            if(routingTokensCount == 1)
-            {
-                routingkeyTokens[0] =routingKey;
-            }
-            else
-            {
-
-
-                int token = 0;
-                while (routingTokens.hasMoreTokens())
-                {
-
-                    AMQShortString next = routingTokens.nextToken();
-        /*            if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH))
-                    {
-                        continue;
-                    }
-        */
-
-                    routingkeyTokens[token++] = next;
-                }
-            }
-
-            _logger.info("Routing key tokens: " + Arrays.asList(routingkeyTokens));
-
-            for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet())
-            {
-
-                AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey);
-
-
-                boolean matching = true;
-                boolean done = false;
-
-                int depthPlusRoutingSkip = 0;
-                int depthPlusQueueSkip = 0;
-
-                final int bindingKeyTokensCount = bindingKeyTokens.length;
-
-                while (matching && !done)
-                {
-
-                    if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip))
-                    {
-                        done = true;
-
-                        // if it was the routing key that ran out of digits
-                        if (routingTokensCount == depthPlusRoutingSkip)
-                        {
-                            if (bindingKeyTokensCount > depthPlusQueueSkip)
-                            { // a hash and it is the last entry
-                                matching =
-                                        bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)
-                                        && (bindingKeyTokensCount == (depthPlusQueueSkip + 1));
-                            }
-                        }
-                        else if (routingTokensCount > depthPlusRoutingSkip)
-                        {
-                            // There is still more routing key to check
-                            matching = false;
-                        }
-
-                        continue;
-                    }
-
-                    // if the values on the two topics don't match
-                    if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip]))
-                    {
-                        if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN))
-                        {
-                            depthPlusQueueSkip++;
-                            depthPlusRoutingSkip++;
-
-                            continue;
-                        }
-                        else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN))
-                        {
-                            // Is this a # at the end
-                            if (bindingKeyTokensCount == (depthPlusQueueSkip + 1))
-                            {
-                                done = true;
-
-                                continue;
-                            }
-
-                            // otherwise # in the middle
-                            while (routingTokensCount > depthPlusRoutingSkip)
-                            {
-                                if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1]))
-                                {
-                                    depthPlusQueueSkip += 2;
-                                    depthPlusRoutingSkip++;
-
-                                    break;
-                                }
-
-                                depthPlusRoutingSkip++;
-                            }
-
-                            continue;
-                        }
-
-                        matching = false;
-                    }
-
-                    depthPlusQueueSkip++;
-                    depthPlusRoutingSkip++;
-                }
-
-                if (matching)
-                {
-                    if(list == null)
-                    {
-                        list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey));
-                    }
-                    else
-                    {
-                        list.addAll(_wildCardBindingKey2queues.get(bindingKey));
-                    }
-                }
-            }
-
+            return Collections.EMPTY_SET;
         }
-        if(!_simpleBindingKey2queues.isEmpty())
+        else
         {
-            List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey);
-            if(list == null)
-            {
-                if(queues == null)
-                {
-                    list =  Collections.EMPTY_LIST;
-                }
-                else
-                {
-                    list = new ArrayList<AMQQueue>(queues);
-                }
-            }
-            else if(queues != null)
+            Set<AMQQueue> queues = new HashSet<AMQQueue>();
+            for(TopicMatcherResult result : results)
             {
-                list.addAll(queues);
-            }
 
+                ((TopicExchangeResult)result).processMessage(message, queues);
+            }
+            return queues;
         }
 
-        return list;
 
     }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java Sun May 11 08:22:03 2008
@@ -4,6 +4,7 @@
 import org.apache.qpid.framing.AMQShortStringTokenizer;
 
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /*
 *
@@ -27,6 +28,10 @@
 */
 public class TopicMatcherDFAState
 {
+    private static final AtomicInteger stateId = new AtomicInteger();
+
+    private final int _id = stateId.incrementAndGet();
+
     private final Collection<TopicMatcherResult> _results;
     private final Map<TopicWord, TopicMatcherDFAState> _nextStateMap;
     private static final byte TOPIC_DELIMITTER = (byte)'.';
@@ -233,4 +238,58 @@
     }
 
 
+    public String toString()
+    {
+        StringBuilder transitions = new StringBuilder();
+        for(Map.Entry<TopicWord, TopicMatcherDFAState> entry : _nextStateMap.entrySet())
+        {
+            transitions.append("[ ");
+            transitions.append(entry.getKey());
+            transitions.append("\t ->\t ");
+            transitions.append(entry.getValue()._id);
+            transitions.append(" ]\n");
+        }
+
+
+        return "[ State " + _id + " ]\n" + transitions + "\n";
+
+    }
+
+    public String reachableStates()
+    {
+        StringBuilder result = new StringBuilder("Start state: " + _id + "\n");
+
+        SortedSet<TopicMatcherDFAState> reachableStates =
+                new TreeSet<TopicMatcherDFAState>(new Comparator<TopicMatcherDFAState>()
+                                                        {
+                                                            public int compare(final TopicMatcherDFAState o1, final TopicMatcherDFAState o2)
+                                                            {
+                                                                return o1._id - o2._id;
+                                                            }
+                                                        });
+        reachableStates.add(this);
+
+        int count;
+
+        do
+        {
+            count = reachableStates.size();
+            Collection<TopicMatcherDFAState> originalStates = new ArrayList<TopicMatcherDFAState>(reachableStates);
+            for(TopicMatcherDFAState state : originalStates)
+            {
+                reachableStates.addAll(state._nextStateMap.values());
+            }
+        }
+        while(reachableStates.size() != count);
+
+
+
+        for(TopicMatcherDFAState state : reachableStates)
+        {
+            result.append(state.toString());
+        }
+
+        return result.toString();
+    }
+
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java Sun May 11 08:22:03 2008
@@ -20,6 +20,6 @@
 * under the License.
 *
 */
-public class TopicMatcherResult
+public interface TopicMatcherResult
 {
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java Sun May 11 08:22:03 2008
@@ -4,6 +4,8 @@
 import org.apache.qpid.framing.AMQShortStringTokenizer;
 
 import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.io.IOException;
 
 /*
 *
@@ -30,6 +32,7 @@
     private static final byte TOPIC_DELIMITER = (byte)'.';
 
     private final TopicWordDictionary _dictionary = new TopicWordDictionary();
+    private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
 
     private static class Position
     {
@@ -37,6 +40,7 @@
         private final boolean _selfTransition;
         private final int _position;
         private final boolean _endState;
+        private boolean _followedByAnyLoop;
 
 
         public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
@@ -59,6 +63,43 @@
     }
 
 
+    public void addBinding(AMQShortString bindingKey, TopicMatcherResult result)
+    {
+
+        TopicMatcherDFAState startingStateMachine;
+        TopicMatcherDFAState newStateMachine;
+
+        do
+        {
+            startingStateMachine = _stateMachine.get();
+            if(startingStateMachine == null)
+            {
+                newStateMachine = createStateMachine(bindingKey, result);
+            }
+            else
+            {
+                newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result));
+            }
+
+        }
+        while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine));
+
+    }
+
+    public Collection<TopicMatcherResult> parse(AMQShortString routingKey)
+    {
+        TopicMatcherDFAState stateMachine = _stateMachine.get();
+        if(stateMachine == null)
+        {
+            return Collections.EMPTY_SET;
+        }
+        else
+        {
+            return stateMachine.parse(_dictionary,routingKey);
+        }
+    }
+
+
     TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result)
     {
         List<TopicWord> wordList = createTopicWordList(bindingKey);
@@ -108,7 +149,6 @@
         }
 
 
-
         int pos = 0;
         int wordPos = 0;
 
@@ -131,6 +171,31 @@
 
         }
 
+
+        for(int p = 0; p<positionCount; p++)
+        {
+            boolean followedByWildcards = true;
+
+            int n = p;
+            while(followedByWildcards && n<(positionCount+1))
+            {
+
+                if(positions[n]._selfTransition)
+                {
+                    break;
+                }
+                else if(positions[n]._word!=TopicWord.ANY_WORD)
+                {
+                    followedByWildcards = false;
+                }
+                n++;
+            }
+
+
+            positions[p]._followedByAnyLoop = followedByWildcards && (n!= positionCount+1);
+        }
+
+
         // from each position you transition to a set of other positions.
         // we approach this by examining steps of increasing length - so we
         // look how far we can go from the start position in 1 word, 2 words, etc...
@@ -258,6 +323,32 @@
             {
                 dest.setValue(Collections.singleton(loopingTerminal));
             }
+            else
+            {
+                Position anyLoop = null;
+                for(Position destPos : dest.getValue())
+                {
+                    if(destPos._followedByAnyLoop)
+                    {
+                        if(anyLoop == null || anyLoop._position<destPos._position)
+                        {
+                            anyLoop = destPos;
+                        }
+                    }
+                }
+                if(anyLoop != null)
+                {
+                    Collection<Position> removals = new ArrayList<Position>();
+                    for(Position destPos : dest.getValue())
+                    {
+                        if(destPos._position < anyLoop._position)
+                        {
+                            removals.add(destPos);
+                        }
+                    }
+                    dest.getValue().removeAll(removals);
+                }
+            }
 
             SimpleState stateForEntry = stateMap.get(dest.getValue());
             if(stateForEntry == null)
@@ -332,8 +423,65 @@
 
     public static void main(String[] args)
     {
+
+        printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+        printMatches(new String[]{
+                        "#.a.#",
+                        "#.b.#",
+                        "#.c.#",
+                        "#.d.#",
+                        "#.e.#",
+                        "#.f.#",
+                        "#.g.#",
+                        "#.h.#",
+                        "#.i.#",
+                        "#.j.#",
+                        "#.k.#",
+                        "#.l.#",
+                        "#.m.#",
+                        "#.n.#",
+                        "#.o.#",
+                        "#.p.#",
+                        "#.q.#"
+
+        }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");        
+/*
+        printMatches(new String[]{
+                "#.a.#",
+                "#.b.#",
+                "#.c.#",
+                "#.d.#",
+                "#.e.#",
+                "#.f.#",
+                "#.g.#",
+                "#.h.#",
+                "#.i.#",
+                "#.j.#",
+                "#.k.#",
+                "#.l.#",
+                "#.m.#",
+                "#.n.#",
+                "#.o.#",
+                "#.p.#",
+                "#.q.#",
+                "#.r.#",
+                "#.s.#",
+                "#.t.#",
+                "#.u.#",
+                "#.v.#",
+                "#.w.#",
+                "#.x.#",
+                "#.y.#",
+                "#.z.#"
+
+
+        },"a.b");
+
+        printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+        printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
         printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c");
 
+*/
 
         printMatches("","");
         printMatches("a","a");
@@ -394,12 +542,19 @@
 
         TopicParser parser = new TopicParser();
 
+        long start = System.currentTimeMillis();
         for(int i = 0; i < bindingKeys.length; i++)
         {
-            TopicMatcherResult r = new TopicMatcherResult();
+            System.out.println((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]);
+            TopicMatcherResult r = new TopicMatcherResult(){};
             resultMap.put(r, bindingKeys[i]);
             AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]);
 
+            System.err.println("=====================================================");
+            System.err.println("Adding binding key: " + bindingKeyShortString);
+            System.err.println("-----------------------------------------------------");
+
+
             if(i==0)
             {
                 sm = parser.createStateMachine(bindingKeyShortString, r);
@@ -408,6 +563,16 @@
             {
                 sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r));
             }
+            System.err.println(sm.reachableStates());
+            System.err.println("=====================================================");
+            try
+            {
+                System.in.read();
+            }
+            catch (IOException e)
+            {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
         }
         AMQShortString routingKeyShortString = new AMQShortString(routingKey);
 
@@ -438,7 +603,7 @@
         AMQShortString routingKeyShortString = new AMQShortString(routingKey);
         TopicParser parser = new TopicParser();
 
-        final TopicMatcherResult result = new TopicMatcherResult();
+        final TopicMatcherResult result = new TopicMatcherResult(){};
 
         TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result);
         return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty();

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java Sun May 11 08:22:03 2008
@@ -28,7 +28,27 @@
 */
 public final class TopicWord
 {
-    public static final TopicWord ANY_WORD = new TopicWord();
-    public static final TopicWord WILDCARD_WORD = new TopicWord();
+    public static final TopicWord ANY_WORD = new TopicWord("*");
+    public static final TopicWord WILDCARD_WORD = new TopicWord("#");
+    private String _word;
 
+    public TopicWord()
+    {
+
+    }
+
+    public TopicWord(String s)
+    {
+        _word = s;
+    }
+
+    public TopicWord(final AMQShortString name)
+    {
+        _word = name.toString();
+    }
+
+    public String toString()
+    {
+        return _word;
+    }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java Sun May 11 08:22:03 2008
@@ -42,7 +42,7 @@
 
     public TopicWord getOrCreateWord(AMQShortString name)
     {
-        TopicWord word = _dictionary.putIfAbsent(name, new TopicWord());
+        TopicWord word = _dictionary.putIfAbsent(name, new TopicWord(name));
         if(word == null)
         {
             word = _dictionary.get(name);

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java Sun May 11 08:22:03 2008
@@ -21,12 +21,12 @@
 //
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * An expression which performs an operation on two expression values
  */
-public abstract class ArithmeticExpression extends BinaryExpression
+public abstract class ArithmeticExpression<E extends Exception> extends BinaryExpression<E>
 {
 
     protected static final int INTEGER = 1;
@@ -248,7 +248,7 @@
         }
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException
+    public Object evaluate(Filterable<E> message) throws E
     {
         Object lvalue = left.evaluate(message);
         if (lvalue == null)

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java Sun May 11 08:22:03 2008
@@ -23,23 +23,23 @@
 /**
  * An expression which performs an operation on two expression values.
  */
-public abstract class BinaryExpression implements Expression
+public abstract class BinaryExpression<E extends Exception> implements Expression<E>
 {
-    protected Expression left;
-    protected Expression right;
+    protected Expression<E> left;
+    protected Expression<E> right;
 
-    public BinaryExpression(Expression left, Expression right)
+    public BinaryExpression(Expression<E> left, Expression<E> right)
     {
         this.left = left;
         this.right = right;
     }
 
-    public Expression getLeft()
+    public Expression<E> getLeft()
     {
         return left;
     }
 
-    public Expression getRight()
+    public Expression<E> getRight()
     {
         return right;
     }
@@ -90,7 +90,7 @@
     /**
      * @param expression
      */
-    public void setRight(Expression expression)
+    public void setRight(Expression<E> expression)
     {
         right = expression;
     }
@@ -98,7 +98,7 @@
     /**
      * @param expression
      */
-    public void setLeft(Expression expression)
+    public void setLeft(Expression<E> expression)
     {
         left = expression;
     }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java Sun May 11 08:22:03 2008
@@ -22,19 +22,20 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * A BooleanExpression is an expression that always
  * produces a Boolean result.
  */
-public interface BooleanExpression extends Expression
+public interface BooleanExpression<E extends Exception> extends Expression<E>
 {
 
     /**
      * @param message
      * @return true if the expression evaluates to Boolean.TRUE.
-     * @throws AMQException
+     * @throws E
      */
-    public boolean matches(AMQMessage message) throws AMQException;
+    public boolean matches(Filterable<E> message) throws E;
 
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java Sun May 11 08:22:03 2008
@@ -29,19 +29,20 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * A filter performing a comparison of two objects
  */
-public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression
+public abstract class ComparisonExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
 {
 
-    public static BooleanExpression createBetween(Expression value, Expression left, Expression right)
+    public static<E extends Exception> BooleanExpression<E> createBetween(Expression<E> value, Expression left, Expression<E> right)
     {
         return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
     }
 
-    public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right)
+    public static<E extends Exception> BooleanExpression<E> createNotBetween(Expression<E> value, Expression<E> left, Expression<E> right)
     {
         return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
     }
@@ -72,7 +73,7 @@
         REGEXP_CONTROL_CHARS.add(new Character('!'));
     }
 
-    static class LikeExpression extends UnaryExpression implements BooleanExpression
+    static class LikeExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
     {
 
         Pattern likePattern;
@@ -80,7 +81,7 @@
         /**
          * @param right
          */
-        public LikeExpression(Expression right, String like, int escape)
+        public LikeExpression(Expression<E> right, String like, int escape)
         {
             super(right);
 
@@ -137,7 +138,7 @@
         /**
          *  org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
          */
-        public Object evaluate(AMQMessage message) throws AMQException
+        public Object evaluate(Filterable<E> message) throws E
         {
 
             Object rv = this.getRight().evaluate(message);
@@ -157,7 +158,7 @@
             return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
         }
 
-        public boolean matches(AMQMessage message) throws AMQException
+        public boolean matches(Filterable<E> message) throws E
         {
             Object object = evaluate(message);
 
@@ -235,45 +236,9 @@
         return doCreateEqual(left, right);
     }
 
-    private static BooleanExpression doCreateEqual(Expression left, Expression right)
+    private static<E extends Exception> BooleanExpression<E> doCreateEqual(Expression<E> left, Expression<E> right)
     {
-        return new ComparisonExpression(left, right)
-            {
-
-                public Object evaluate(AMQMessage message) throws AMQException
-                {
-                    Object lv = left.evaluate(message);
-                    Object rv = right.evaluate(message);
-
-                    // Iff one of the values is null
-                    if ((lv == null) ^ (rv == null))
-                    {
-                        return Boolean.FALSE;
-                    }
-
-                    if ((lv == rv) || lv.equals(rv))
-                    {
-                        return Boolean.TRUE;
-                    }
-
-                    if ((lv instanceof Comparable) && (rv instanceof Comparable))
-                    {
-                        return compare((Comparable) lv, (Comparable) rv);
-                    }
-
-                    return Boolean.FALSE;
-                }
-
-                protected boolean asBoolean(int answer)
-                {
-                    return answer == 0;
-                }
-
-                public String getExpressionSymbol()
-                {
-                    return "=";
-                }
-            };
+        return new EqualExpression(left, right);
     }
 
     public static BooleanExpression createGreaterThan(final Expression left, final Expression right)
@@ -423,7 +388,7 @@
         super(left, right);
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException
+    public Object evaluate(Filterable<E> message) throws E
     {
         Comparable lv = (Comparable) left.evaluate(message);
         if (lv == null)
@@ -585,11 +550,52 @@
 
     protected abstract boolean asBoolean(int answer);
 
-    public boolean matches(AMQMessage message) throws AMQException
+    public boolean matches(Filterable<E> message) throws E
     {
         Object object = evaluate(message);
 
         return (object != null) && (object == Boolean.TRUE);
     }
 
+    private static class EqualExpression<E extends Exception> extends ComparisonExpression<E>
+    {
+        public EqualExpression(final Expression<E> left, final Expression<E> right)
+        {
+            super(left, right);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            Object lv = left.evaluate(message);
+            Object rv = right.evaluate(message);
+
+            // Iff one of the values is null
+            if ((lv == null) ^ (rv == null))
+            {
+                return Boolean.FALSE;
+            }
+
+            if ((lv == rv) || lv.equals(rv))
+            {
+                return Boolean.TRUE;
+            }
+
+            if ((lv instanceof Comparable) && (rv instanceof Comparable))
+            {
+                return compare((Comparable) lv, (Comparable) rv);
+            }
+
+            return Boolean.FALSE;
+        }
+
+        protected boolean asBoolean(int answer)
+        {
+            return answer == 0;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "=";
+        }
+    }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java Sun May 11 08:22:03 2008
@@ -27,21 +27,22 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * Represents a constant expression
  */
-public class ConstantExpression implements Expression
+public class ConstantExpression<E extends Exception> implements Expression<E>
 {
 
-    static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+    static class BooleanConstantExpression<E extends Exception> extends ConstantExpression<E> implements BooleanExpression<E>
     {
         public BooleanConstantExpression(Object value)
         {
             super(value);
         }
 
-        public boolean matches(AMQMessage message) throws AMQException
+        public boolean matches(Filterable<E> message) throws E
         {
             Object object = evaluate(message);
 
@@ -120,7 +121,7 @@
         this.value = value;
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException
+    public Object evaluate(Filterable<E> message) throws E
     {
         return value;
     }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java Sun May 11 08:22:03 2008
@@ -22,16 +22,17 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * Represents an expression
  */
-public interface Expression
+public interface Expression<E extends Exception>
 {
 
     /**
      * @return the value of this expression
      */
-    public Object evaluate(AMQMessage message) throws AMQException;
+    public Object evaluate(Filterable<E> message) throws E;
 
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java Sun May 11 08:22:03 2008
@@ -24,14 +24,16 @@
 //
 
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.AMQException;
 
-public interface FilterManager
+public interface FilterManager<E extends Exception>
 {
-    void add(MessageFilter filter);
+    void add(MessageFilter<E> filter);
 
-    void remove(MessageFilter filter);
+    void remove(MessageFilter<E> filter);
 
-    boolean allAllow(AMQMessage msg);
+    boolean allAllow(Filterable<E>  msg);
 
     boolean hasFilters();
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Sun May 11 08:22:03 2008
@@ -39,7 +39,7 @@
         if (filters != null)
         {
 
-            manager = new SimpleFilterManager();
+
 
             if(filters.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()))
             {
@@ -47,23 +47,13 @@
 
                 if (selector != null && !selector.equals(""))
                 {
+                    manager = new SimpleFilterManager();
                     manager.add(new JMSSelectorFilter(selector));
                 }
 
             }
 
-            if (filters.containsKey(AMQPFilterTypes.NO_CONSUME.getValue()))
-            {
-                manager.add(new NoConsumerFilter());
-            }
-
 
-
-            //If we added no filters don't bear the overhead of having an filter manager
-            if (!manager.hasFilters())
-            {
-                manager = null;
-            }
         }
         else
         {

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Sun May 11 08:22:03 2008
@@ -23,42 +23,30 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.filter.jms.selector.SelectorParser;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 
-
-public class JMSSelectorFilter implements MessageFilter
+public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E>
 {
     private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
 
     private String _selector;
-    private BooleanExpression _matcher;
+    private BooleanExpression<E> _matcher;
 
     public JMSSelectorFilter(String selector) throws AMQException
     {
         _selector = selector;
-        _logger.info("Created JMSSelectorFilter with selector:" + _selector);
-
-
         _matcher = new SelectorParser().parse(selector);
-
-
     }
 
-    public boolean matches(AMQMessage message)
+    public boolean matches(Filterable<E> message) throws E
     {
-        try
-        {
-            boolean match = _matcher.matches(message);
-            _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
-            return match;
-        }
-        catch (AMQException e)
+        boolean match = _matcher.matches(message);
+        if(_logger.isDebugEnabled())
         {
-            //fixme this needs to be sorted.. it shouldn't happen
-            e.printStackTrace();  
+            _logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
         }
-        return false;
+        return match;
     }
 
     public String getSelector()