You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/07/20 21:05:08 UTC

svn commit: r795958 [1/3] - in /qpid/branches/java-broker-0-10/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apache/q...

Author: rgodfrey
Date: Mon Jul 20 19:05:05 2009
New Revision: 795958

URL: http://svn.apache.org/viewvc?rev=795958&view=rev
Log:
Java Broker 0-10 Exploratory work

Added:
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
Modified:
    qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
    qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java

Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Mon Jul 20 19:05:05 2009
@@ -20,10 +20,7 @@
  */
 package org.apache.qpid.extras.exchanges.diagnostic;
 
-import java.util.List;
-import java.util.Map;
 import java.util.ArrayList;
-import java.util.Collection;
 
 import javax.management.JMException;
 import javax.management.openmbean.OpenDataException;
@@ -34,8 +31,8 @@
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.InboundMessage;
 
 import org.apache.qpid.junit.extensions.util.SizeOf;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
@@ -193,20 +190,20 @@
         return false;
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
     {
         
         Long value = new Long(SizeOf.getUsedMemory());
         AMQShortString key = new AMQShortString("memory");
         
-        FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders();
+        FieldTable headers = ((BasicContentHeaderProperties)payload.getMessageHeader().properties).getHeaders();
         headers.put(key, value);
-        ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
+        ((BasicContentHeaderProperties)payload.getMessageHeader().properties).setHeaders(headers);
         AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
 
         ArrayList<AMQQueue> queues =  new ArrayList<AMQQueue>();
         queues.add(q);
-        payload.enqueue(queues);
+        return queues;
         
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Mon Jul 20 19:05:05 2009
@@ -23,14 +23,15 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
 
 public class TestExchange implements Exchange
 {
@@ -63,6 +64,16 @@
         return false;
     }
 
+    public boolean isBound(String bindingKey, AMQQueue queue)
+    {
+        return false;
+    }
+
+    public boolean isBound(String bindingKey)
+    {
+        return false;
+    }
+
     public void initialise(VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete)
             throws AMQException
     {
@@ -102,8 +113,9 @@
     {
     }
 
-    public void route(IncomingMessage message) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage message) throws AMQException
     {
+        return new ArrayList<AMQQueue>();
     }
 
     public int getTicket()

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Jul 20 19:05:05 2009
@@ -57,6 +57,7 @@
 import org.apache.qpid.server.txn.LocalTransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.ServerMessage;
 
 public class AMQChannel
 {
@@ -157,27 +158,35 @@
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
-        if (_currentMessage == null)
-        {
-            throw new AMQException("Received content header without previously receiving a BasicPublish frame");
-        }
-        else
+        StoreContext.setCurrentContext(_storeContext);
+        try
         {
-            if (_log.isDebugEnabled())
+            if (_currentMessage == null)
             {
-                _log.debug("Content header received on channel " + _channelId);
+                throw new AMQException("Received content header without previously receiving a BasicPublish frame");
             }
+            else
+            {
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug("Content header received on channel " + _channelId);
+                }
 
-            _currentMessage.setContentHeaderBody(contentHeaderBody);
+                _currentMessage.setContentHeaderBody(contentHeaderBody);
 
-            _currentMessage.setExpiration();
+                _currentMessage.setExpiration();
 
-            routeCurrentMessage();
+                routeCurrentMessage();
 
-            _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+                _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
 
-            deliverCurrentMessageIfComplete();
+                deliverCurrentMessageIfComplete();
 
+            }
+        }
+        finally
+        {
+            StoreContext.clearCurrentContext();
         }
     }
 
@@ -212,6 +221,7 @@
 
     public void publishContentBody(ContentBody contentBody) throws AMQException
     {
+        StoreContext.setCurrentContext(_storeContext);
         if (_currentMessage == null)
         {
             throw new AMQException("Received content body without previously receiving a JmsPublishBody");
@@ -231,6 +241,7 @@
                     _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
                             contentBody));
 
+
             deliverCurrentMessageIfComplete();
         }
         catch (AMQException e)
@@ -240,6 +251,10 @@
             _currentMessage = null;
             throw e;
         }
+        finally
+        {
+            StoreContext.clearCurrentContext();
+        }
     }
 
     protected void routeCurrentMessage() throws AMQException
@@ -425,7 +440,7 @@
         {
             if (entry.getQueue() == null)
             {
-                _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity());
+                _log.debug("Adding unacked message with a null queue:" + entry);
             }
             else
             {
@@ -487,7 +502,7 @@
             if (!unacked.isQueueDeleted())
             {
                 // Mark message redelivered
-                unacked.getMessage().setRedelivered(true);
+                unacked.setRedelivered(true);
 
                 // Ensure message is released for redelivery
                 unacked.release();
@@ -518,7 +533,7 @@
         if (unacked != null)
         {
             // Mark message redelivered
-            unacked.getMessage().setRedelivered(true);
+            unacked.setRedelivered(true);
 
             // Ensure message is released for redelivery
             if (!unacked.isQueueDeleted())
@@ -551,7 +566,7 @@
             }
             else
             {
-                _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+                _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
                           + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
 
                 unacked.discard(_storeContext);
@@ -612,7 +627,7 @@
 
 
 
-            AMQMessage msg = message.getMessage();
+            ServerMessage msg = message.getMessage();
             AMQQueue queue = message.getQueue();
 
             // Our Java Client will always suspend the channel when resending!
@@ -631,7 +646,7 @@
 
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
-            msg.setRedelivered(true);
+            message.setRedelivered(true);
 
             Subscription sub = message.getDeliveredSubscription();
 
@@ -829,14 +844,25 @@
     {
         if (!_returnMessages.isEmpty())
         {
+            StoreContext sc =StoreContext.setCurrentContext(_storeContext);
             for (RequiredDeliveryException bouncedMessage : _returnMessages)
             {
-                AMQMessage message = bouncedMessage.getAMQMessage();
-                _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
-                                                                 new AMQShortString(bouncedMessage.getMessage()));
+                ServerMessage serverMessage = bouncedMessage.getAMQMessage();
+                if(serverMessage instanceof AMQMessage)
+                {
+                    AMQMessage message = (AMQMessage) serverMessage;
+                    _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+                                                                     new AMQShortString(bouncedMessage.getMessage()));
 
-                message.decrementReference(_storeContext);
+                }
+                else
+                {
+                    // TODO AMQP 0-10 Message
+                    throw new RuntimeException("not yet implemented conversion of 0-10 messages");
+                }
+                bouncedMessage.release();
             }
+            StoreContext.setCurrentContext(sc);
 
             _returnMessages.clear();
         }
@@ -884,8 +910,18 @@
             public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
                     throws AMQException
             {
-               getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+                ServerMessage msg = entry.getMessage();
+                if(msg instanceof AMQMessage)
+                {
+                    getProtocolSession().getProtocolOutputConverter().writeDeliver((AMQMessage)msg, getChannelId(),
+                                                                                   deliveryTag, sub.getConsumerTag());
+                }
+                else
+                {
+                    //TODO - Convert 0-10 Message into 0-8/9 message
+                }
             }
+
         };
 
     public ClientDeliveryMethod getClientDeliveryMethod()

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java Mon Jul 20 19:05:05 2009
@@ -26,6 +26,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
@@ -56,9 +57,8 @@
 
     public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
     {
-
-        AMQMessage msg = message.getMessage();
-        msg.setRedelivered(true);
+        
+        message.setRedelivered(true);
         final Subscription subscription = message.getDeliveredSubscription();
         if (subscription != null)
         {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Mon Jul 20 19:05:05 2009
@@ -45,6 +45,9 @@
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.NewThreadExecutor;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.*;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -56,6 +59,9 @@
 import org.apache.qpid.server.protocol.AMQPProtocolProvider;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.transport.ServerConnection;
 
 /**
  * Main entry point for AMQPD.
@@ -314,6 +320,29 @@
         }
         
         bind(port, serverConfig);
+
+
+        IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+        final ConnectionDelegate delegate =
+                new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, "localhost");
+
+
+        ConnectionBinding cb = new ConnectionBinding()
+        {
+            public Connection connection()
+            {
+                ServerConnection conn = new ServerConnection();
+                conn.setConnectionDelegate(delegate);
+                return conn;
+            }
+        };
+
+        int port_0_10 = port + 1;
+
+        org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor
+            ("0.0.0.0", port_0_10, cb);
+        ioa.start();
     }
 
     /**

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Mon Jul 20 19:05:05 2009
@@ -23,6 +23,9 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageReference;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
 
 /**
  * Signals that a required delivery could not be made. This could be bacuse of the immediate flag being set and the
@@ -39,9 +42,9 @@
  */
 public abstract class RequiredDeliveryException extends AMQException
 {
-    private AMQMessage _amqMessage;
+    private MessageReference _amqMessage;
 
-    public RequiredDeliveryException(String message, AMQMessage payload)
+    public RequiredDeliveryException(String message, ServerMessage payload)
     {
         super(message);
 
@@ -54,20 +57,20 @@
         super(message);
     }
 
-    public void setMessage(final AMQMessage payload)
+    public void setMessage(final ServerMessage payload)
     {
 
         // Increment the reference as this message is in the routing phase
         // and so will have the ref decremented as routing fails.
         // we need to keep this message around so we can return it in the
         // handler. So increment here.
-        _amqMessage = payload.takeReference();
+        _amqMessage = payload.newReference();
 
     }
 
-    public AMQMessage getAMQMessage()
+    public ServerMessage getAMQMessage()
     {
-        return _amqMessage;
+        return _amqMessage.getMessage();
     }
 
     public AMQConstant getErrorCode()
@@ -76,4 +79,9 @@
     }
 
     public abstract AMQConstant getReplyCode();
+
+    public void release()
+    {
+        _amqMessage.release();
+    }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Mon Jul 20 19:05:05 2009
@@ -130,7 +130,7 @@
         //in memory (persistent changes will be rolled back by store)
         for (QueueEntry msg : _unacked.values())
         {
-            msg.getMessage().takeReference();
+            // TODO - should requeue, whole thing is messed up
         }
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Mon Jul 20 19:05:05 2009
@@ -39,6 +39,7 @@
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -204,4 +205,15 @@
     {
         return getVirtualHost().getQueueRegistry();
     }
+
+    public boolean isBound(String bindingKey, AMQQueue queue)
+    {
+        return isBound(new AMQShortString(bindingKey), queue);
+    }
+
+    public boolean isBound(String bindingKey)
+    {
+        return isBound(new AMQShortString(bindingKey));
+    }
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Mon Jul 20 19:05:05 2009
@@ -24,7 +24,6 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.protocol.ExchangeInitialiser;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -57,6 +56,11 @@
         new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
     }
 
+    public Exchange getExchange(String exchangeName)
+    {
+        return getExchange(new AMQShortString(exchangeName));
+    }
+
     public MessageStore getMessageStore()
     {
         return _host.getMessageStore();
@@ -134,6 +138,6 @@
         {
             throw new AMQException("Exchange '" + exchange + "' does not exist");
         }
-        exch.route(payload);
+        payload.enqueue(exch.route(payload));
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Mon Jul 20 19:05:05 2009
@@ -40,9 +40,9 @@
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
 
 public class DirectExchange extends AbstractExchange
 {
@@ -192,10 +192,10 @@
         }
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
     {
 
-        final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey();
+        final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : new AMQShortString(payload.getRoutingKey());
 
         final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
 
@@ -204,7 +204,8 @@
             _logger.debug("Publishing message to queue " + queues);
         }
 
-        payload.enqueue(queues);
+        return queues;
+
 
 
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Mon Jul 20 19:05:05 2009
@@ -24,12 +24,11 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
 
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
 
 public interface Exchange
 {
@@ -54,7 +53,7 @@
 
     void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
 
-    void route(IncomingMessage message) throws AMQException;
+    ArrayList<AMQQueue> route(InboundMessage message) throws AMQException;
 
 
     /**
@@ -93,6 +92,8 @@
      */
     boolean hasBindings();
 
-    
 
+    boolean isBound(String bindingKey, AMQQueue queue);
+
+    boolean isBound(String bindingKey);
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Mon Jul 20 19:05:05 2009
@@ -48,4 +48,6 @@
     Collection<AMQShortString> getExchangeNames();
 
     void initialise() throws AMQException;
+
+    Exchange getExchange(String exchangeName);
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Mon Jul 20 19:05:05 2009
@@ -28,9 +28,9 @@
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -182,7 +182,7 @@
         }
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
     {
 
     
@@ -191,7 +191,7 @@
             _logger.debug("Publishing message to queue " + _queues);
         }
 
-        payload.enqueue(new ArrayList(_queues));
+        return new ArrayList(_queues);
 
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Mon Jul 20 19:05:05 2009
@@ -28,6 +28,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.framing.AMQTypedValue;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.message.AMQMessageHeader;
 
 /**
  * Defines binding and matching based on a set of headers.
@@ -139,7 +140,7 @@
      * @return true if the headers define any required keys and match any required
      * values
      */
-    public boolean matches(FieldTable headers)
+    public boolean matches(AMQMessageHeader headers)
     {
         if(headers == null)
         {
@@ -151,13 +152,13 @@
         }
     }
 
-    private boolean and(FieldTable headers)
+    private boolean and(AMQMessageHeader headers)
     {
-        if(headers.keys().containsAll(required))
+        if(headers.containsHeaders(required))
         {
             for(Map.Entry<String, Object> e : matches.entrySet())
             {
-                if(!e.getValue().equals(headers.getObject(e.getKey())))
+                if(!e.getValue().equals(headers.getHeader(e.getKey())))
                 {
                     return false;
                 }
@@ -171,11 +172,11 @@
     }
 
 
-    private boolean or(final FieldTable headers)
+    private boolean or(final AMQMessageHeader headers)
     {
-        if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor()))
+        if(required.isEmpty() || passesRequiredOr(headers))
         {
-            return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor()))
+            return ((!matches.isEmpty()) && passesMatchesOr(headers))
                     || (required.isEmpty() && matches.isEmpty());
         }
         else
@@ -184,6 +185,32 @@
         }
     }
 
+    private boolean passesMatchesOr(AMQMessageHeader headers)
+    {
+        for(Map.Entry<String,Object> entry : matches.entrySet())
+        {
+            if(!headers.containsHeader(entry.getKey())
+               || !((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
+                   || (entry.getValue().equals(headers.getHeader(entry.getKey())))))
+            {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean passesRequiredOr(AMQMessageHeader headers)
+    {
+        for(String name : required)
+        {
+            if(headers.containsHeader(name))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private void processSpecial(String key, Object value)
     {
         if("X-match".equalsIgnoreCase(key))

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Mon Jul 20 19:05:05 2009
@@ -31,9 +31,10 @@
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
 
 import javax.management.JMException;
 import javax.management.openmbean.ArrayType;
@@ -50,7 +51,6 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Collection;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -237,31 +237,31 @@
         }
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
     {
-        FieldTable headers = getHeaders(payload.getContentHeaderBody());
+        AMQMessageHeader header = payload.getMessageHeader();
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
+            _logger.debug("Exchange " + getName() + ": routing message with headers " + header);
         }
         boolean routed = false;
         ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
         for (Registration e : _bindings)
         {
 
-            if (e.binding.matches(headers))
+            if (e.binding.matches(header))
             {
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Exchange " + getName() + ": delivering message with headers " +
-                                  headers + " to " + e.queue.getName());
+                                  header + " to " + e.queue.getName());
                 }
                 queues.add(e.queue);
 
                 routed = true;
             }
         }
-        payload.enqueue(queues);
+        return queues;
     }
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Mon Jul 20 19:05:05 2009
@@ -30,13 +30,13 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortStringTokenizer;
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.exchange.topic.TopicParser;
 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.message.InboundMessage;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -109,7 +109,7 @@
 
     private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
 
-    private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
+    private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
 
     public static class Binding
     {
@@ -160,7 +160,7 @@
     private final class TopicExchangeResult implements TopicMatcherResult
     {
         private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
-        private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
+        private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
 
         public void addUnfilteredQueue(AMQQueue queue)
         {
@@ -190,12 +190,12 @@
         }
 
 
-        public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+        public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
         {
-            Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+            Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
             if(filters == null)
             {
-                filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
+                filters = new ConcurrentHashMap<MessageFilter,Integer>();
                 _filteredQueues.put(queue, filters);
             }
             Integer instances = filters.get(filter);
@@ -210,9 +210,9 @@
 
         }
 
-        public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+        public void removeFilteredQueue(AMQQueue queue, MessageFilter filter)
         {
-            Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+            Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
             if(filters != null)
             {
                 Integer instances = filters.get(filter);
@@ -237,11 +237,11 @@
         }
 
         public void replaceQueueFilter(AMQQueue queue,
-                                       MessageFilter<RuntimeException> oldFilter,
-                                       MessageFilter<RuntimeException> newFilter)
+                                       MessageFilter oldFilter,
+                                       MessageFilter newFilter)
         {
-            Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
-            Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
+            Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+            Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters);
             Integer oldFilterInstances = filters.get(oldFilter);
             if(oldFilterInstances == 1)
             {
@@ -263,7 +263,7 @@
             _filteredQueues.put(queue,newFilters);
         }
 
-        public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
+        public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues)
         {
             if(queues == null)
             {
@@ -284,11 +284,11 @@
             queues.addAll(_unfilteredQueues.keySet());
             if(!_filteredQueues.isEmpty())
             {
-                for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
+                for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet())
                 {
                     if(!queues.contains(entry.getKey()))
                     {
-                        for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
+                        for(MessageFilter filter : entry.getValue().keySet())
                         {
                             if(filter.matches(msg))
                             {
@@ -456,18 +456,18 @@
 
     }
 
-    private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
+    private JMSSelectorFilter createSelectorFilter(final FieldTable args)
             throws AMQException
     {
 
         final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
-        WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
+        WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
         JMSSelectorFilter selector = null;
 
         if(selectorRef == null || (selector = selectorRef.get())==null)
         {
-            selector = new JMSSelectorFilter<RuntimeException>(selectorString);
-            _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
+            selector = new JMSSelectorFilter(selectorString);
+            _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
         }
         return selector;
     }
@@ -528,10 +528,12 @@
         return normalizedString;
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
     {
 
-        final AMQShortString routingKey = payload.getRoutingKey();
+        final AMQShortString routingKey = payload.getRoutingKey() == null
+                                          ? AMQShortString.EMPTY_STRING
+                                          : new AMQShortString(payload.getRoutingKey());
 
         // The copy here is unfortunate, but not too bad relevant to the amount of
         // things created and copied in getMatchedQueues
@@ -543,7 +545,7 @@
             _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
         }
 
-        payload.enqueue(queues);
+        return queues;
 
     }
 
@@ -646,7 +648,7 @@
         }
     }
 
-    private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
+    private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
     {
 
         Collection<TopicMatcherResult> results = _parser.parse(routingKey);

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java Mon Jul 20 19:05:05 2009
@@ -26,7 +26,7 @@
 /**
  * An expression which performs an operation on two expression values
  */
-public abstract class ArithmeticExpression<E extends Exception> extends BinaryExpression<E>
+public abstract class ArithmeticExpression extends BinaryExpression
 {
 
     protected static final int INTEGER = 1;
@@ -248,7 +248,7 @@
         }
     }
 
-    public Object evaluate(Filterable<E> message) throws E
+    public Object evaluate(Filterable message)
     {
         Object lvalue = left.evaluate(message);
         if (lvalue == null)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java Mon Jul 20 19:05:05 2009
@@ -23,23 +23,23 @@
 /**
  * An expression which performs an operation on two expression values.
  */
-public abstract class BinaryExpression<E extends Exception> implements Expression<E>
+public abstract class BinaryExpression implements Expression
 {
-    protected Expression<E> left;
-    protected Expression<E> right;
+    protected Expression left;
+    protected Expression right;
 
-    public BinaryExpression(Expression<E> left, Expression<E> right)
+    public BinaryExpression(Expression left, Expression right)
     {
         this.left = left;
         this.right = right;
     }
 
-    public Expression<E> getLeft()
+    public Expression getLeft()
     {
         return left;
     }
 
-    public Expression<E> getRight()
+    public Expression getRight()
     {
         return right;
     }
@@ -90,7 +90,7 @@
     /**
      * @param expression
      */
-    public void setRight(Expression<E> expression)
+    public void setRight(Expression expression)
     {
         right = expression;
     }
@@ -98,7 +98,7 @@
     /**
      * @param expression
      */
-    public void setLeft(Expression<E> expression)
+    public void setLeft(Expression expression)
     {
         left = expression;
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java Mon Jul 20 19:05:05 2009
@@ -28,14 +28,13 @@
  * A BooleanExpression is an expression that always
  * produces a Boolean result.
  */
-public interface BooleanExpression<E extends Exception> extends Expression<E>
+public interface BooleanExpression extends Expression
 {
 
     /**
      * @param message
      * @return true if the expression evaluates to Boolean.TRUE.
-     * @throws E
      */
-    public boolean matches(Filterable<E> message) throws E;
+    public boolean matches(Filterable message);
 
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java Mon Jul 20 19:05:05 2009
@@ -34,15 +34,15 @@
 /**
  * A filter performing a comparison of two objects
  */
-public abstract class ComparisonExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression
 {
 
-    public static<E extends Exception> BooleanExpression<E> createBetween(Expression<E> value, Expression left, Expression<E> right)
+    public static BooleanExpression createBetween(Expression value, Expression left, Expression right)
     {
         return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
     }
 
-    public static<E extends Exception> BooleanExpression<E> createNotBetween(Expression<E> value, Expression<E> left, Expression<E> right)
+    public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right)
     {
         return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
     }
@@ -73,7 +73,7 @@
         REGEXP_CONTROL_CHARS.add(new Character('!'));
     }
 
-    static class LikeExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
+    static class LikeExpression extends UnaryExpression implements BooleanExpression
     {
 
         Pattern likePattern;
@@ -81,7 +81,7 @@
         /**
          * @param right
          */
-        public LikeExpression(Expression<E> right, String like, int escape)
+        public LikeExpression(Expression right, String like, int escape)
         {
             super(right);
 
@@ -138,7 +138,7 @@
         /**
          *  org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
          */
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
 
             Object rv = this.getRight().evaluate(message);
@@ -158,7 +158,7 @@
             return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
         }
 
-        public boolean matches(Filterable<E> message) throws E
+        public boolean matches(Filterable message)
         {
             Object object = evaluate(message);
 
@@ -236,7 +236,7 @@
         return doCreateEqual(left, right);
     }
 
-    private static<E extends Exception> BooleanExpression<E> doCreateEqual(Expression<E> left, Expression<E> right)
+    private static BooleanExpression doCreateEqual(Expression left, Expression right)
     {
         return new EqualExpression(left, right);
     }
@@ -388,7 +388,7 @@
         super(left, right);
     }
 
-    public Object evaluate(Filterable<E> message) throws E
+    public Object evaluate(Filterable message)
     {
         Comparable lv = (Comparable) left.evaluate(message);
         if (lv == null)
@@ -550,21 +550,21 @@
 
     protected abstract boolean asBoolean(int answer);
 
-    public boolean matches(Filterable<E> message) throws E
+    public boolean matches(Filterable message)
     {
         Object object = evaluate(message);
 
         return (object != null) && (object == Boolean.TRUE);
     }
 
-    private static class EqualExpression<E extends Exception> extends ComparisonExpression<E>
+    private static class EqualExpression extends ComparisonExpression
     {
-        public EqualExpression(final Expression<E> left, final Expression<E> right)
+        public EqualExpression(final Expression left, final Expression right)
         {
             super(left, right);
         }
 
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
             Object lv = left.evaluate(message);
             Object rv = right.evaluate(message);

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java Mon Jul 20 19:05:05 2009
@@ -32,17 +32,17 @@
 /**
  * Represents a constant expression
  */
-public class ConstantExpression<E extends Exception> implements Expression<E>
+public class ConstantExpression implements Expression
 {
 
-    static class BooleanConstantExpression<E extends Exception> extends ConstantExpression<E> implements BooleanExpression<E>
+    static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
     {
         public BooleanConstantExpression(Object value)
         {
             super(value);
         }
 
-        public boolean matches(Filterable<E> message) throws E
+        public boolean matches(Filterable message)
         {
             Object object = evaluate(message);
 
@@ -121,7 +121,7 @@
         this.value = value;
     }
 
-    public Object evaluate(Filterable<E> message) throws E
+    public Object evaluate(Filterable message)
     {
         return value;
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java Mon Jul 20 19:05:05 2009
@@ -27,12 +27,12 @@
 /**
  * Represents an expression
  */
-public interface Expression<E extends Exception>
+public interface Expression
 {
 
     /**
      * @return the value of this expression
      */
-    public Object evaluate(Filterable<E> message) throws E;
+    public Object evaluate(Filterable message);
 
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java Mon Jul 20 19:05:05 2009
@@ -27,13 +27,13 @@
 import org.apache.qpid.server.queue.Filterable;
 import org.apache.qpid.AMQException;
 
-public interface FilterManager<E extends Exception>
+public interface FilterManager
 {
-    void add(MessageFilter<E> filter);
+    void add(MessageFilter filter);
 
-    void remove(MessageFilter<E> filter);
+    void remove(MessageFilter filter);
 
-    boolean allAllow(Filterable<E>  msg);
+    boolean allAllow(Filterable  msg);
 
     boolean hasFilters();
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Mon Jul 20 19:05:05 2009
@@ -26,12 +26,12 @@
 import org.apache.qpid.server.queue.Filterable;
 
 
-public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E>
+public class JMSSelectorFilter implements MessageFilter
 {
     private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
 
     private String _selector;
-    private BooleanExpression<E> _matcher;
+    private BooleanExpression _matcher;
 
     public JMSSelectorFilter(String selector) throws AMQException
     {
@@ -39,7 +39,7 @@
         _matcher = new SelectorParser().parse(selector);
     }
 
-    public boolean matches(Filterable<E> message) throws E
+    public boolean matches(Filterable message)
     {
         boolean match = _matcher.matches(message);
         if(_logger.isDebugEnabled())

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Mon Jul 20 19:05:05 2009
@@ -27,15 +27,15 @@
 /**
  * A filter performing a comparison of two objects
  */
-public abstract class LogicExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
+public abstract class LogicExpression extends BinaryExpression implements BooleanExpression
 {
 
-    public static<E extends Exception> BooleanExpression createOR(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
+    public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue)
     {
         return new OrExpression(lvalue, rvalue);
     }
 
-    public static<E extends Exception> BooleanExpression createAND(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
+    public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue)
     {
         return new AndExpression(lvalue, rvalue);
     }
@@ -49,23 +49,23 @@
         super(left, right);
     }
 
-    public abstract Object evaluate(Filterable<E> message) throws E;
+    public abstract Object evaluate(Filterable message);
 
-    public boolean matches(Filterable<E> message) throws E
+    public boolean matches(Filterable message)
     {
         Object object = evaluate(message);
 
         return (object != null) && (object == Boolean.TRUE);
     }
 
-    private static class OrExpression<E extends Exception> extends LogicExpression<E>
+    private static class OrExpression extends LogicExpression
     {
-        public OrExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+        public OrExpression(final BooleanExpression lvalue, final BooleanExpression rvalue)
         {
             super(lvalue, rvalue);
         }
 
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
 
             Boolean lv = (Boolean) left.evaluate(message);
@@ -86,14 +86,14 @@
         }
     }
 
-    private static class AndExpression<E extends Exception> extends LogicExpression<E>
+    private static class AndExpression extends LogicExpression
     {
-        public AndExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+        public AndExpression(final BooleanExpression lvalue, final BooleanExpression rvalue)
         {
             super(lvalue, rvalue);
         }
 
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
 
             Boolean lv = (Boolean) left.evaluate(message);

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Mon Jul 20 19:05:05 2009
@@ -24,7 +24,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.Filterable;
 
-public interface MessageFilter<E extends Exception>
+public interface MessageFilter
 {
-    boolean matches(Filterable<E> message) throws E;
+    boolean matches(Filterable message);
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Mon Jul 20 19:05:05 2009
@@ -27,7 +27,6 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.CommonContentHeaderProperties;
 import org.apache.qpid.server.queue.Filterable;
@@ -35,7 +34,7 @@
 /**
  * Represents a property  expression
  */
-public class PropertyExpression<E extends Exception> implements Expression<E>
+public class PropertyExpression implements Expression
 {
     // Constants - defined the same as JMS
     private static final int NON_PERSISTENT = 1;
@@ -44,12 +43,12 @@
 
     private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
 
-    private static final HashMap<String, Expression<? extends Exception>> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression<? extends Exception>>();
+    private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
 
     {
-        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression<E>()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression()
                                      {
-                                         public Object evaluate(Filterable<E> message)
+                                         public Object evaluate(Filterable message)
                                          {
                                              //TODO
                                              return null;
@@ -73,9 +72,9 @@
 
         JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression());
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression()
                                      {
-                                         public Object evaluate(Filterable message) throws E
+                                         public Object evaluate(Filterable message)
                                          {
                                              return message.isRedelivered();
                                          }
@@ -83,7 +82,7 @@
     }
 
     private final String name;
-    private final Expression<E> jmsPropertyExpression;
+    private final Expression jmsPropertyExpression;
 
     public boolean outerTest()
     {
@@ -96,10 +95,10 @@
 
         
 
-        jmsPropertyExpression = (Expression<E>) JMS_PROPERTY_EXPRESSIONS.get(name);
+        jmsPropertyExpression = (Expression) JMS_PROPERTY_EXPRESSIONS.get(name);
     }
 
-    public Object evaluate(Filterable<E> message) throws E
+    public Object evaluate(Filterable message)
     {
 
         if (jmsPropertyExpression != null)
@@ -108,17 +107,7 @@
         }
         else
         {
-
-            CommonContentHeaderProperties _properties =
-                (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
-
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Looking up property:" + name);
-                _logger.debug("Properties are:" + _properties.getHeaders().keySet());
-            }
-
-            return _properties.getHeaders().getObject(name);
+            return message.getMessageHeader().getHeader(name);
         }
     }
 
@@ -158,39 +147,30 @@
 
     }
 
-    private static class ReplyToExpression<E extends Exception> implements Expression<E>
+    private static class ReplyToExpression implements Expression
     {
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
-
-            CommonContentHeaderProperties _properties =
-                (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
-            AMQShortString replyTo = _properties.getReplyTo();
-
-            return (replyTo == null) ? null : replyTo.toString();
-
+            String replyTo = message.getMessageHeader().getReplyTo();
+            return replyTo;
         }
 
     }
 
-    private static class TypeExpression<E extends Exception> implements Expression<E>
+    private static class TypeExpression implements Expression
     {
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
-                CommonContentHeaderProperties _properties =
-                    (CommonContentHeaderProperties)
-                        message.getContentHeaderBody().properties;
-                AMQShortString type = _properties.getType();
 
-                return (type == null) ? null : type.toString();
+                String type = message.getMessageHeader().getType();
+                return type;
 
         }
     }
 
-    private static class DeliveryModeExpression<E extends Exception> implements Expression<E>
+    private static class DeliveryModeExpression implements Expression
     {
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
                 int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
                 if (_logger.isDebugEnabled())
@@ -202,68 +182,53 @@
         }
     }
 
-    private static class PriorityExpression<E extends Exception> implements Expression<E>
+    private static class PriorityExpression implements Expression
     {
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
-            CommonContentHeaderProperties _properties =
-                (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
-
-            return (int) _properties.getPriority();
+            byte priority = message.getMessageHeader().getPriority();
+            return (int) priority;
         }
     }
 
-    private static class MessageIDExpression<E extends Exception> implements Expression<E>
+    private static class MessageIDExpression implements Expression
     {
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
 
-            CommonContentHeaderProperties _properties =
-                (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
-            AMQShortString messageId = _properties.getMessageId();
+            String messageId = message.getMessageHeader().getMessageId();
 
-            return (messageId == null) ? null : messageId;
+            return messageId;
 
         }
     }
 
-    private static class TimestampExpression<E extends Exception> implements Expression<E>
+    private static class TimestampExpression implements Expression
     {
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
-            CommonContentHeaderProperties _properties =
-                (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
-
-            return _properties.getTimestamp();
+            long timestamp = message.getMessageHeader().getTimestamp();
+            return timestamp;
         }
     }
 
-    private static class CorrelationIdExpression<E extends Exception> implements Expression<E>
+    private static class CorrelationIdExpression implements Expression
     {
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
-            CommonContentHeaderProperties _properties =
-                (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
-            AMQShortString correlationId = _properties.getCorrelationId();
 
-            return (correlationId == null) ? null : correlationId.toString();
+            String correlationId = message.getMessageHeader().getCorrelationId();
+
+            return correlationId;
         }
     }
 
-    private static class ExpirationExpression<E extends Exception> implements Expression<E>
+    private static class ExpirationExpression implements Expression
     {
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
-
-            CommonContentHeaderProperties _properties =
-                (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
-
-            return _properties.getExpiration();
+            long expiration = message.getMessageHeader().getExpiration();
+            return expiration;
 
         }
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Mon Jul 20 19:05:05 2009
@@ -27,43 +27,34 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.Filterable;
 
-public class SimpleFilterManager implements FilterManager<AMQException>
+public class SimpleFilterManager implements FilterManager
 {
     private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
 
-    private final ConcurrentLinkedQueue<MessageFilter<AMQException>> _filters;
+    private final ConcurrentLinkedQueue<MessageFilter> _filters;
 
     public SimpleFilterManager()
     {
         _logger.debug("Creating SimpleFilterManager");
-        _filters = new ConcurrentLinkedQueue<MessageFilter<AMQException>>();
+        _filters = new ConcurrentLinkedQueue<MessageFilter>();
     }
 
-    public void add(MessageFilter<AMQException> filter)
+    public void add(MessageFilter filter)
     {
         _filters.add(filter);
     }
 
-    public void remove(MessageFilter<AMQException> filter)
+    public void remove(MessageFilter filter)
     {
         _filters.remove(filter);
     }
 
-    public boolean allAllow(Filterable<AMQException> msg)
+    public boolean allAllow(Filterable msg)
     {
-        for (MessageFilter<AMQException> filter : _filters)
+        for (MessageFilter filter : _filters)
         {
-            try
+            if (!filter.matches(msg))
             {
-                if (!filter.matches(msg))
-                {
-                    return false;
-                }
-            }
-            catch (AMQException e)
-            {
-                //fixme
-                e.printStackTrace();  
                 return false;
             }
         }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Mon Jul 20 19:05:05 2009
@@ -35,18 +35,18 @@
 /**
  * An expression which performs an operation on two expression values
  */
-public abstract class UnaryExpression<E extends Exception> implements Expression<E>
+public abstract class UnaryExpression implements Expression
 {
 
     private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
-    protected Expression<E> right;
+    protected Expression right;
 
-    public static<E extends Exception> Expression<E> createNegate(Expression<E> left)
+    public static Expression createNegate(Expression left)
     {
         return new NegativeExpression(left);
     }
 
-    public static<E extends Exception> BooleanExpression createInExpression(PropertyExpression<E> right, List elements, final boolean not)
+    public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not)
     {
 
         // Use a HashSet if there are many elements.
@@ -69,14 +69,14 @@
         return new InExpression(right, inList, not);
     }
 
-    abstract static class BooleanUnaryExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
+    abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression
     {
-        public BooleanUnaryExpression(Expression<E> left)
+        public BooleanUnaryExpression(Expression left)
         {
             super(left);
         }
 
-        public boolean matches(Filterable<E> message) throws E
+        public boolean matches(Filterable message)
         {
             Object object = evaluate(message);
 
@@ -85,7 +85,7 @@
     }
     ;
 
-    public static<E extends Exception> BooleanExpression<E> createNOT(BooleanExpression<E> left)
+    public static<E extends Exception> BooleanExpression createNOT(BooleanExpression left)
     {
         return new NotExpression(left);
     }
@@ -100,7 +100,7 @@
         return new XQueryExpression(xpath);
     }
 
-    public static<E extends Exception> BooleanExpression createBooleanCast(Expression<E> left)
+    public static<E extends Exception> BooleanExpression createBooleanCast(Expression left)
     {
         return new BooleanCastExpression(left);
     }
@@ -151,7 +151,7 @@
         this.right = left;
     }
 
-    public Expression<E> getRight()
+    public Expression getRight()
     {
         return right;
     }
@@ -204,14 +204,14 @@
      */
     public abstract String getExpressionSymbol();
 
-    private static class NegativeExpression<E extends Exception> extends UnaryExpression<E>
+    private static class NegativeExpression extends UnaryExpression
     {
-        public NegativeExpression(final Expression<E> left)
+        public NegativeExpression(final Expression left)
         {
             super(left);
         }
 
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
             Object rvalue = right.evaluate(message);
             if (rvalue == null)
@@ -233,19 +233,19 @@
         }
     }
 
-    private static class InExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    private static class InExpression extends BooleanUnaryExpression
     {
         private final Collection _inList;
         private final boolean _not;
 
-        public InExpression(final PropertyExpression<E> right, final Collection inList, final boolean not)
+        public InExpression(final PropertyExpression right, final Collection inList, final boolean not)
         {
             super(right);
             _inList = inList;
             _not = not;
         }
 
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
 
             Object rvalue = right.evaluate(message);
@@ -309,14 +309,14 @@
         }
     }
 
-    private static class NotExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    private static class NotExpression extends BooleanUnaryExpression
     {
-        public NotExpression(final BooleanExpression<E> left)
+        public NotExpression(final BooleanExpression left)
         {
             super(left);
         }
 
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
             Boolean lvalue = (Boolean) right.evaluate(message);
             if (lvalue == null)
@@ -333,14 +333,14 @@
         }
     }
 
-    private static class BooleanCastExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    private static class BooleanCastExpression extends BooleanUnaryExpression
     {
-        public BooleanCastExpression(final Expression<E> left)
+        public BooleanCastExpression(final Expression left)
         {
             super(left);
         }
 
-        public Object evaluate(Filterable<E> message) throws E
+        public Object evaluate(Filterable message)
         {
             Object rvalue = right.evaluate(message);
             if (rvalue == null)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Mon Jul 20 19:05:05 2009
@@ -71,7 +71,7 @@
     private final XPathEvaluator evaluator;
 
     static public interface XPathEvaluator {
-        public boolean evaluate(Filterable message) throws AMQException;
+        public boolean evaluate(Filterable message);
     }
 
     XPathExpression(String xpath) {
@@ -93,7 +93,7 @@
         }
     }
 
-    public Object evaluate(Filterable message) throws AMQException {
+    public Object evaluate(Filterable message)  {
 //        try {
 //FIXME this is flow to disk work
 //            if( message.isDropped() )
@@ -118,7 +118,7 @@
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws AMQException
      */
-    public boolean matches(Filterable message) throws AMQException
+    public boolean matches(Filterable message)
     {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org