You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [3/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker/...

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Thu Aug 14 20:40:49 2008
@@ -25,6 +25,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.protocol.ExchangeInitialiser;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -121,9 +122,9 @@
      * @param payload
      * @throws AMQException if something goes wrong delivering data
      */
-    public void routeContent(AMQMessage payload) throws AMQException
+    public void routeContent(IncomingMessage payload) throws AMQException
     {
-        final AMQShortString exchange = payload.getMessagePublishInfo().getExchange();
+        final AMQShortString exchange = payload.getExchange();
         final Exchange exch = getExchange(exchange);
         // there is a small window of opportunity for the exchange to be deleted in between
         // the BasicPublish being received (where the exchange is validated) and the final

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Thu Aug 14 20:40:49 2008
@@ -23,7 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQMessage;
+
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -53,7 +54,7 @@
 
     void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
 
-    void route(AMQMessage message) throws AMQException;
+    void route(IncomingMessage message) throws AMQException;
 
 
     /**
@@ -92,6 +93,6 @@
      */
     boolean hasBindings();
 
-    Map<AMQShortString, List<AMQQueue>> getBindings();
+    
 
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Thu Aug 14 20:40:49 2008
@@ -26,10 +26,9 @@
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -95,7 +94,7 @@
 
             try
             {
-                queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
+                queue.bind(FanoutExchange.this, new AMQShortString(binding), null);
             }
             catch (AMQException ex)
             {
@@ -183,32 +182,17 @@
         }
     }
 
-    public void route(AMQMessage payload) throws AMQException
+    public void route(IncomingMessage payload) throws AMQException
     {
-        final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
-        final AMQShortString routingKey = publishInfo.getRoutingKey();
-        if ((_queues == null) || _queues.isEmpty())
+
+    
+        if (_logger.isDebugEnabled())
         {
-            String msg = "No queues bound to " + this;
-            if (publishInfo.isMandatory() || publishInfo.isImmediate())
-            {
-                throw new NoRouteException(msg, payload);
-            }
-            else
-            {
-                _logger.warn(msg);
-            }
+            _logger.debug("Publishing message to queue " + _queues);
         }
-        else
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Publishing message to queue " + _queues);
-            }
 
-            payload.enqueue(new ArrayList(_queues));
+        payload.enqueue(new ArrayList(_queues));
 
-        }
     }
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Thu Aug 14 20:40:49 2008
@@ -31,7 +31,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -50,6 +50,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Collection;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -240,7 +241,7 @@
         }
     }
 
-    public void route(AMQMessage payload) throws AMQException
+    public void route(IncomingMessage payload) throws AMQException
     {
         FieldTable headers = getHeaders(payload.getContentHeaderBody());
         if (_logger.isDebugEnabled())
@@ -248,8 +249,10 @@
             _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
         }
         boolean routed = false;
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
         for (Registration e : _bindings)
         {
+
             if (e.binding.matches(headers))
             {
                 if (_logger.isDebugEnabled())
@@ -257,25 +260,12 @@
                     _logger.debug("Exchange " + getName() + ": delivering message with headers " +
                                   headers + " to " + e.queue.getName());
                 }
-                payload.enqueue(e.queue);
-                routed = true;
-            }
-        }
-        if (!routed)
-        {
-
-            String msg = "Exchange " + getName() + ": message not routable.";
+                queues.add(e.queue);
 
-            if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate())
-            {
-                throw new NoRouteException(msg, payload);
-            }
-            else
-            {
-                _logger.warn(msg);
+                routed = true;
             }
-
         }
+        payload.enqueue(queues);
     }
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java Thu Aug 14 20:40:49 2008
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -32,27 +33,27 @@
 
 /**
  * An index of queues against routing key. Allows multiple queues to be stored
- * against the same key. Used in the DestNameExchange.
+ * against the same key. Used in the DirectExchange.
  */
 class Index
 {
-    private ConcurrentMap<AMQShortString, List<AMQQueue>> _index
-            = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+    private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index
+            = new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>();
 
     synchronized boolean add(AMQShortString key, AMQQueue queue)
     {
-        List<AMQQueue> queues = _index.get(key);
+        ArrayList<AMQQueue> queues = _index.get(key);
         if(queues == null)
         {
-            queues = new CopyOnWriteArrayList<AMQQueue>();
-            //next call is atomic, so there is no race to create the list
-            List<AMQQueue> active = _index.putIfAbsent(key, queues);
-            if(active != null)
-            {
-                //someone added the new one in faster than we did, so use theirs
-                queues = active;
-            }
+            queues = new ArrayList<AMQQueue>();
+        }
+        else
+        {
+            queues = new ArrayList<AMQQueue>(queues);
         }
+        //next call is atomic, so there is no race to create the list
+        _index.put(key, queues);
+
         if(queues.contains(queue))
         {
             return false;
@@ -65,20 +66,28 @@
 
     synchronized boolean remove(AMQShortString key, AMQQueue queue)
     {
-        List<AMQQueue> queues = _index.get(key);
+        ArrayList<AMQQueue> queues = _index.get(key);
         if (queues != null)
         {
+            queues = new ArrayList<AMQQueue>(queues);
             boolean removed = queues.remove(queue);
-            if (queues.size() == 0)
+            if(removed)
             {
-                _index.remove(key);
+                if (queues.size() == 0)
+                {
+                    _index.remove(key);
+                }
+                else
+                {
+                    _index.put(key, queues);
+                }
             }
             return removed;
         }
         return false;
     }
 
-    List<AMQQueue> get(AMQShortString key)
+    ArrayList<AMQQueue> get(AMQShortString key)
     {
         return _index.get(key);
     }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java Thu Aug 14 20:40:49 2008
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 
 /**
  * Separated out from the ExchangeRegistry interface to allow components
@@ -36,5 +37,5 @@
      *
      * @throws org.apache.qpid.AMQException if something goes wrong delivering data
      */
-    void routeContent(AMQMessage message) throws AMQException;
+    void routeContent(IncomingMessage message) throws AMQException;
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java Thu Aug 14 20:40:49 2008
@@ -23,6 +23,7 @@
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 
 /**
  * NoRouteException is a {@link RequiredDeliveryException} that represents the failure case where a manadatory message
@@ -36,9 +37,9 @@
  */
 public class NoRouteException extends RequiredDeliveryException
 {
-    public NoRouteException(String msg, AMQMessage message)
+    public NoRouteException(String msg, AMQMessage amqMessage)
     {
-        super(msg, message);
+        super(msg, amqMessage);
     }
 
     public AMQConstant getReplyCode()

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java Thu Aug 14 20:40:49 2008
@@ -21,12 +21,12 @@
 //
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * An expression which performs an operation on two expression values
  */
-public abstract class ArithmeticExpression extends BinaryExpression
+public abstract class ArithmeticExpression<E extends Exception> extends BinaryExpression<E>
 {
 
     protected static final int INTEGER = 1;
@@ -248,7 +248,7 @@
         }
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException
+    public Object evaluate(Filterable<E> message) throws E
     {
         Object lvalue = left.evaluate(message);
         if (lvalue == null)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java Thu Aug 14 20:40:49 2008
@@ -23,23 +23,23 @@
 /**
  * An expression which performs an operation on two expression values.
  */
-public abstract class BinaryExpression implements Expression
+public abstract class BinaryExpression<E extends Exception> implements Expression<E>
 {
-    protected Expression left;
-    protected Expression right;
+    protected Expression<E> left;
+    protected Expression<E> right;
 
-    public BinaryExpression(Expression left, Expression right)
+    public BinaryExpression(Expression<E> left, Expression<E> right)
     {
         this.left = left;
         this.right = right;
     }
 
-    public Expression getLeft()
+    public Expression<E> getLeft()
     {
         return left;
     }
 
-    public Expression getRight()
+    public Expression<E> getRight()
     {
         return right;
     }
@@ -90,7 +90,7 @@
     /**
      * @param expression
      */
-    public void setRight(Expression expression)
+    public void setRight(Expression<E> expression)
     {
         right = expression;
     }
@@ -98,7 +98,7 @@
     /**
      * @param expression
      */
-    public void setLeft(Expression expression)
+    public void setLeft(Expression<E> expression)
     {
         left = expression;
     }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java Thu Aug 14 20:40:49 2008
@@ -22,19 +22,20 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * A BooleanExpression is an expression that always
  * produces a Boolean result.
  */
-public interface BooleanExpression extends Expression
+public interface BooleanExpression<E extends Exception> extends Expression<E>
 {
 
     /**
      * @param message
      * @return true if the expression evaluates to Boolean.TRUE.
-     * @throws AMQException
+     * @throws E
      */
-    public boolean matches(AMQMessage message) throws AMQException;
+    public boolean matches(Filterable<E> message) throws E;
 
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java Thu Aug 14 20:40:49 2008
@@ -28,21 +28,21 @@
 import java.util.regex.Pattern;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * A filter performing a comparison of two objects
  */
-public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression
+public abstract class ComparisonExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
 {
 
-    public static BooleanExpression createBetween(Expression value, Expression left, Expression right)
+    public static<E extends Exception> BooleanExpression<E> createBetween(Expression<E> value, Expression left, Expression<E> right)
     {
         return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
     }
 
-    public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right)
+    public static<E extends Exception> BooleanExpression<E> createNotBetween(Expression<E> value, Expression<E> left, Expression<E> right)
     {
         return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
     }
@@ -73,7 +73,7 @@
         REGEXP_CONTROL_CHARS.add(new Character('!'));
     }
 
-    static class LikeExpression extends UnaryExpression implements BooleanExpression
+    static class LikeExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
     {
 
         Pattern likePattern;
@@ -81,7 +81,7 @@
         /**
          * @param right
          */
-        public LikeExpression(Expression right, String like, int escape)
+        public LikeExpression(Expression<E> right, String like, int escape)
         {
             super(right);
 
@@ -138,7 +138,7 @@
         /**
          *  org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
          */
-        public Object evaluate(AMQMessage message) throws AMQException
+        public Object evaluate(Filterable<E> message) throws E
         {
 
             Object rv = this.getRight().evaluate(message);
@@ -148,11 +148,6 @@
                 return null;
             }
 
-            if(rv instanceof AMQShortString)
-            {
-                rv = rv.toString();
-            }
-
             if (!(rv instanceof String))
             {
                 return
@@ -163,7 +158,7 @@
             return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
         }
 
-        public boolean matches(AMQMessage message) throws AMQException
+        public boolean matches(Filterable<E> message) throws E
         {
             Object object = evaluate(message);
 
@@ -241,45 +236,9 @@
         return doCreateEqual(left, right);
     }
 
-    private static BooleanExpression doCreateEqual(Expression left, Expression right)
+    private static<E extends Exception> BooleanExpression<E> doCreateEqual(Expression<E> left, Expression<E> right)
     {
-        return new ComparisonExpression(left, right)
-            {
-
-                public Object evaluate(AMQMessage message) throws AMQException
-                {
-                    Object lv = left.evaluate(message);
-                    Object rv = right.evaluate(message);
-
-                    // Iff one of the values is null
-                    if ((lv == null) ^ (rv == null))
-                    {
-                        return Boolean.FALSE;
-                    }
-
-                    if ((lv == rv) || lv.equals(rv))
-                    {
-                        return Boolean.TRUE;
-                    }
-
-                    if ((lv instanceof Comparable) && (rv instanceof Comparable))
-                    {
-                        return compare((Comparable) lv, (Comparable) rv);
-                    }
-
-                    return Boolean.FALSE;
-                }
-
-                protected boolean asBoolean(int answer)
-                {
-                    return answer == 0;
-                }
-
-                public String getExpressionSymbol()
-                {
-                    return "=";
-                }
-            };
+        return new EqualExpression(left, right);
     }
 
     public static BooleanExpression createGreaterThan(final Expression left, final Expression right)
@@ -429,7 +388,7 @@
         super(left, right);
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException
+    public Object evaluate(Filterable<E> message) throws E
     {
         Comparable lv = (Comparable) left.evaluate(message);
         if (lv == null)
@@ -454,40 +413,7 @@
         // try to convert up to allow the comparison.
         if (lc != rc)
         {
-            if(lc == AMQShortString.class)
-            {
-                if(rc == String.class)
-                {
-                    rv = new AMQShortString((String) rv);
-
-                    if(right instanceof ConstantExpression)
-                    {
-                        ((ConstantExpression)right).setValue(rv);
-                    }
-                }
-                else
-                {
-                    return Boolean.FALSE;
-                }
-            }
-            else if(lc == String.class)
-            {
-                if(rc == AMQShortString.class)
-                {
-                    lv = new AMQShortString((String) lv);
-
-                    if(left instanceof ConstantExpression)
-                    {
-                        ((ConstantExpression)left).setValue(lv);
-                    }
-                }
-                else
-                {
-                    return Boolean.FALSE;
-                }
-
-            }
-            else if (lc == Byte.class)
+            if (lc == Byte.class)
             {
                 if (rc == Short.class)
                 {
@@ -624,11 +550,52 @@
 
     protected abstract boolean asBoolean(int answer);
 
-    public boolean matches(AMQMessage message) throws AMQException
+    public boolean matches(Filterable<E> message) throws E
     {
         Object object = evaluate(message);
 
         return (object != null) && (object == Boolean.TRUE);
     }
 
+    private static class EqualExpression<E extends Exception> extends ComparisonExpression<E>
+    {
+        public EqualExpression(final Expression<E> left, final Expression<E> right)
+        {
+            super(left, right);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            Object lv = left.evaluate(message);
+            Object rv = right.evaluate(message);
+
+            // Iff one of the values is null
+            if ((lv == null) ^ (rv == null))
+            {
+                return Boolean.FALSE;
+            }
+
+            if ((lv == rv) || lv.equals(rv))
+            {
+                return Boolean.TRUE;
+            }
+
+            if ((lv instanceof Comparable) && (rv instanceof Comparable))
+            {
+                return compare((Comparable) lv, (Comparable) rv);
+            }
+
+            return Boolean.FALSE;
+        }
+
+        protected boolean asBoolean(int answer)
+        {
+            return answer == 0;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "=";
+        }
+    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java Thu Aug 14 20:40:49 2008
@@ -26,23 +26,23 @@
 import java.math.BigDecimal;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * Represents a constant expression
  */
-public class ConstantExpression implements Expression
+public class ConstantExpression<E extends Exception> implements Expression<E>
 {
 
-    static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+    static class BooleanConstantExpression<E extends Exception> extends ConstantExpression<E> implements BooleanExpression<E>
     {
         public BooleanConstantExpression(Object value)
         {
             super(value);
         }
 
-        public boolean matches(AMQMessage message) throws AMQException
+        public boolean matches(Filterable<E> message) throws E
         {
             Object object = evaluate(message);
 
@@ -121,7 +121,7 @@
         this.value = value;
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException
+    public Object evaluate(Filterable<E> message) throws E
     {
         return value;
     }
@@ -131,12 +131,6 @@
         return value;
     }
 
-    public void setValue(final Object value)
-    {
-        this.value = value;
-    }
-
-
     /**
      * @see java.lang.Object#toString()
      */

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java Thu Aug 14 20:40:49 2008
@@ -22,16 +22,17 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * Represents an expression
  */
-public interface Expression
+public interface Expression<E extends Exception>
 {
 
     /**
      * @return the value of this expression
      */
-    public Object evaluate(AMQMessage message) throws AMQException;
+    public Object evaluate(Filterable<E> message) throws E;
 
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java Thu Aug 14 20:40:49 2008
@@ -24,14 +24,16 @@
 //
 
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.AMQException;
 
-public interface FilterManager
+public interface FilterManager<E extends Exception>
 {
-    void add(MessageFilter filter);
+    void add(MessageFilter<E> filter);
 
-    void remove(MessageFilter filter);
+    void remove(MessageFilter<E> filter);
 
-    boolean allAllow(AMQMessage msg);
+    boolean allAllow(Filterable<E>  msg);
 
     boolean hasFilters();
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Thu Aug 14 20:40:49 2008
@@ -39,7 +39,7 @@
         if (filters != null)
         {
 
-            manager = new SimpleFilterManager();
+
 
             if(filters.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()))
             {
@@ -47,23 +47,13 @@
 
                 if (selector != null && !selector.equals(""))
                 {
+                    manager = new SimpleFilterManager();
                     manager.add(new JMSSelectorFilter(selector));
                 }
 
             }
 
-            if (filters.containsKey(AMQPFilterTypes.NO_CONSUME.getValue()))
-            {
-                manager.add(new NoConsumerFilter());
-            }
-
 
-
-            //If we added no filters don't bear the overhead of having an filter manager
-            if (!manager.hasFilters())
-            {
-                manager = null;
-            }
         }
         else
         {

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Thu Aug 14 20:40:49 2008
@@ -23,45 +23,30 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.filter.jms.selector.SelectorParser;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 
-
-public class JMSSelectorFilter implements MessageFilter
+public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E>
 {
     private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
 
     private String _selector;
-    private BooleanExpression _matcher;
+    private BooleanExpression<E> _matcher;
 
     public JMSSelectorFilter(String selector) throws AMQException
     {
         _selector = selector;
-        _logger.info("Created JMSSelectorFilter with selector:" + _selector);
-
-
         _matcher = new SelectorParser().parse(selector);
-
-
     }
 
-    public boolean matches(AMQMessage message)
+    public boolean matches(Filterable<E> message) throws E
     {
-        try
-        {
-            boolean match = _matcher.matches(message);
-            if(_logger.isDebugEnabled())
-            {
-                _logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
-            }
-            return match;
-        }
-        catch (AMQException e)
+        boolean match = _matcher.matches(message);
+        if(_logger.isDebugEnabled())
         {
-            //fixme this needs to be sorted.. it shouldn't happen
-            e.printStackTrace();  
+            _logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
         }
-        return false;
+        return match;
     }
 
     public String getSelector()

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Thu Aug 14 20:40:49 2008
@@ -22,71 +22,22 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * A filter performing a comparison of two objects
  */
-public abstract class LogicExpression extends BinaryExpression implements BooleanExpression
+public abstract class LogicExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
 {
 
-    public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue)
+    public static<E extends Exception> BooleanExpression createOR(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
     {
-        return new LogicExpression(lvalue, rvalue)
-            {
-
-                public Object evaluate(AMQMessage message) throws AMQException
-                {
-
-                    Boolean lv = (Boolean) left.evaluate(message);
-                    // Can we do an OR shortcut??
-                    if ((lv != null) && lv.booleanValue())
-                    {
-                        return Boolean.TRUE;
-                    }
-
-                    Boolean rv = (Boolean) right.evaluate(message);
-
-                    return (rv == null) ? null : rv;
-                }
-
-                public String getExpressionSymbol()
-                {
-                    return "OR";
-                }
-            };
+        return new OrExpression(lvalue, rvalue);
     }
 
-    public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue)
+    public static<E extends Exception> BooleanExpression createAND(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
     {
-        return new LogicExpression(lvalue, rvalue)
-            {
-
-                public Object evaluate(AMQMessage message) throws AMQException
-                {
-
-                    Boolean lv = (Boolean) left.evaluate(message);
-
-                    // Can we do an AND shortcut??
-                    if (lv == null)
-                    {
-                        return null;
-                    }
-
-                    if (!lv.booleanValue())
-                    {
-                        return Boolean.FALSE;
-                    }
-
-                    Boolean rv = (Boolean) right.evaluate(message);
-
-                    return (rv == null) ? null : rv;
-                }
-
-                public String getExpressionSymbol()
-                {
-                    return "AND";
-                }
-            };
+        return new AndExpression(lvalue, rvalue);
     }
 
     /**
@@ -98,13 +49,74 @@
         super(left, right);
     }
 
-    public abstract Object evaluate(AMQMessage message) throws AMQException;
+    public abstract Object evaluate(Filterable<E> message) throws E;
 
-    public boolean matches(AMQMessage message) throws AMQException
+    public boolean matches(Filterable<E> message) throws E
     {
         Object object = evaluate(message);
 
         return (object != null) && (object == Boolean.TRUE);
     }
 
+    private static class OrExpression<E extends Exception> extends LogicExpression<E>
+    {
+        public OrExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+        {
+            super(lvalue, rvalue);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            Boolean lv = (Boolean) left.evaluate(message);
+            // Can we do an OR shortcut??
+            if ((lv != null) && lv.booleanValue())
+            {
+                return Boolean.TRUE;
+            }
+
+            Boolean rv = (Boolean) right.evaluate(message);
+
+            return (rv == null) ? null : rv;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "OR";
+        }
+    }
+
+    private static class AndExpression<E extends Exception> extends LogicExpression<E>
+    {
+        public AndExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+        {
+            super(lvalue, rvalue);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            Boolean lv = (Boolean) left.evaluate(message);
+
+            // Can we do an AND shortcut??
+            if (lv == null)
+            {
+                return null;
+            }
+
+            if (!lv.booleanValue())
+            {
+                return Boolean.FALSE;
+            }
+
+            Boolean rv = (Boolean) right.evaluate(message);
+
+            return (rv == null) ? null : rv;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "AND";
+        }
+    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Thu Aug 14 20:40:49 2008
@@ -22,8 +22,9 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
-public interface MessageFilter
+public interface MessageFilter<E extends Exception>
 {
-    boolean matches(AMQMessage message) throws AMQException;
+    boolean matches(Filterable<E> message) throws E;
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java Thu Aug 14 20:40:49 2008
@@ -22,7 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 public class NoConsumerFilter implements MessageFilter
 {
@@ -34,7 +34,7 @@
         _logger.info("Created NoConsumerFilter");
     }
 
-    public boolean matches(AMQMessage message)
+    public boolean matches(Filterable message)
     {
        return true;
     }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Thu Aug 14 20:40:49 2008
@@ -30,12 +30,12 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * Represents a property  expression
  */
-public class PropertyExpression implements Expression
+public class PropertyExpression<E extends Exception> implements Expression<E>
 {
     // Constants - defined the same as JMS
     private static final int NON_PERSISTENT = 1;
@@ -44,223 +44,60 @@
 
     private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
 
-    private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
+    private static final HashMap<String, Expression<? extends Exception>> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression<? extends Exception>>();
 
-    static
     {
-        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression<E>()
                                      {
-                                         public Object evaluate(AMQMessage message)
+                                         public Object evaluate(Filterable<E> message)
                                          {
                                              //TODO
                                              return null;
                                          }
                                      });
-        JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-                                                 AMQShortString replyTo = _properties.getReplyTo();
-
-                                                 return (replyTo == null) ? null : replyTo;
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
-
-                                         }
-
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-                                                 AMQShortString type = _properties.getType();
-
-                                                 return (type == null) ? null : type;
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
-
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-                                             try
-                                             {
-                                                 int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
-                                                 if (_logger.isDebugEnabled())
-                                                 {
-                                                     _logger.debug("JMSDeliveryMode is :" + mode);
-                                                 }
-
-                                                 return mode;
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-                                             }
-
-                                             return NON_PERSISTENT;
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-
-                                                 return (int) _properties.getPriority();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-                                             }
-
-                                             return DEFAULT_PRIORITY;
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-                                                 AMQShortString messageId = _properties.getMessageId();
-
-                                                 return (messageId == null) ? null : messageId;
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
+        JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new ReplyToExpression());
 
-                                                 return null;
-                                             }
+        JMS_PROPERTY_EXPRESSIONS.put("JMSType", new TypeExpression());
 
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-
-                                                 return _properties.getTimestamp();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
-
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-                                                 AMQShortString correlationId = _properties.getCorrelationId();
-
-                                                 return (correlationId == null) ? null : correlationId;
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new DeliveryModeExpression());
 
-                                         }
-                                     });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new PriorityExpression());
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
+        JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new MessageIDExpression());
 
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-
-                                                 return _properties.getExpiration();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
+        JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new TimestampExpression());
 
-                                                 return null;
-                                             }
+        JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new CorrelationIdExpression());
 
-                                         }
-                                     });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression());
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>()
                                      {
-                                         public Object evaluate(AMQMessage message)
+                                         public Object evaluate(Filterable message) throws E
                                          {
                                              return message.isRedelivered();
                                          }
                                      });
-
     }
 
-    private final AMQShortString name;
-    private final Expression jmsPropertyExpression;
+    private final String name;
+    private final Expression<E> jmsPropertyExpression;
+
+    public boolean outerTest()
+    {
+        return false;
+    }
 
     public PropertyExpression(String name)
     {
-        this.name = new AMQShortString(name);
-        jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name);
+        this.name = name;
+
+        
+
+        jmsPropertyExpression = (Expression<E>) JMS_PROPERTY_EXPRESSIONS.get(name);
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException
+    public Object evaluate(Filterable<E> message) throws E
     {
 
         if (jmsPropertyExpression != null)
@@ -283,7 +120,7 @@
         }
     }
 
-    public AMQShortString getName()
+    public String getName()
     {
         return name;
     }
@@ -293,7 +130,7 @@
      */
     public String toString()
     {
-        return name.toString();
+        return name;
     }
 
     /**
@@ -319,4 +156,113 @@
 
     }
 
+    private static class ReplyToExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+            AMQShortString replyTo = _properties.getReplyTo();
+
+            return (replyTo == null) ? null : replyTo.toString();
+
+        }
+
+    }
+
+    private static class TypeExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+                CommonContentHeaderProperties _properties =
+                    (CommonContentHeaderProperties)
+                        message.getContentHeaderBody().properties;
+                AMQShortString type = _properties.getType();
+
+                return (type == null) ? null : type.toString();
+
+        }
+    }
+
+    private static class DeliveryModeExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+                int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("JMSDeliveryMode is :" + mode);
+                }
+
+                return mode;
+        }
+    }
+
+    private static class PriorityExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+
+            return (int) _properties.getPriority();
+        }
+    }
+
+    private static class MessageIDExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+            AMQShortString messageId = _properties.getMessageId();
+
+            return (messageId == null) ? null : messageId;
+
+        }
+    }
+
+    private static class TimestampExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+
+            return _properties.getTimestamp();
+        }
+    }
+
+    private static class CorrelationIdExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+            AMQShortString correlationId = _properties.getCorrelationId();
+
+            return (correlationId == null) ? null : correlationId.toString();
+        }
+    }
+
+    private static class ExpirationExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+
+            return _properties.getExpiration();
+
+        }
+    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Thu Aug 14 20:40:49 2008
@@ -25,32 +25,33 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
-public class SimpleFilterManager implements FilterManager
+public class SimpleFilterManager implements FilterManager<AMQException>
 {
     private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
 
-    private final ConcurrentLinkedQueue<MessageFilter> _filters;
+    private final ConcurrentLinkedQueue<MessageFilter<AMQException>> _filters;
 
     public SimpleFilterManager()
     {
         _logger.debug("Creating SimpleFilterManager");
-        _filters = new ConcurrentLinkedQueue<MessageFilter>();
+        _filters = new ConcurrentLinkedQueue<MessageFilter<AMQException>>();
     }
 
-    public void add(MessageFilter filter)
+    public void add(MessageFilter<AMQException> filter)
     {
         _filters.add(filter);
     }
 
-    public void remove(MessageFilter filter)
+    public void remove(MessageFilter<AMQException> filter)
     {
         _filters.remove(filter);
     }
 
-    public boolean allAllow(AMQMessage msg)
+    public boolean allAllow(Filterable<AMQException> msg)
     {
-        for (MessageFilter filter : _filters)
+        for (MessageFilter<AMQException> filter : _filters)
         {
             try
             {

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Thu Aug 14 20:40:49 2008
@@ -30,45 +30,23 @@
 import java.util.List;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * An expression which performs an operation on two expression values
  */
-public abstract class UnaryExpression implements Expression
+public abstract class UnaryExpression<E extends Exception> implements Expression<E>
 {
 
     private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
-    protected Expression right;
+    protected Expression<E> right;
 
-    public static Expression createNegate(Expression left)
+    public static<E extends Exception> Expression<E> createNegate(Expression<E> left)
     {
-        return new UnaryExpression(left)
-        {
-            public Object evaluate(AMQMessage message) throws AMQException
-            {
-                Object rvalue = right.evaluate(message);
-                if (rvalue == null)
-                {
-                    return null;
-                }
-
-                if (rvalue instanceof Number)
-                {
-                    return negate((Number) rvalue);
-                }
-
-                return null;
-            }
-
-            public String getExpressionSymbol()
-            {
-                return "-";
-            }
-        };
+        return new NegativeExpression(left);
     }
 
-    public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not)
+    public static<E extends Exception> BooleanExpression createInExpression(PropertyExpression<E> right, List elements, final boolean not)
     {
 
         // Use a HashSet if there are many elements.
@@ -88,81 +66,17 @@
 
         final Collection inList = t;
 
-        return new BooleanUnaryExpression(right)
-        {
-            public Object evaluate(AMQMessage message) throws AMQException
-            {
-
-                Object rvalue = right.evaluate(message);
-                if (rvalue == null)
-                {
-                    return null;
-                }
-
-                if (rvalue.getClass() != String.class)
-                {
-                    return null;
-                }
-
-                if (((inList != null) && inList.contains(rvalue)) ^ not)
-                {
-                    return Boolean.TRUE;
-                }
-                else
-                {
-                    return Boolean.FALSE;
-                }
-
-            }
-
-            public String toString()
-            {
-                StringBuffer answer = new StringBuffer();
-                answer.append(right);
-                answer.append(" ");
-                answer.append(getExpressionSymbol());
-                answer.append(" ( ");
-
-                int count = 0;
-                for (Iterator i = inList.iterator(); i.hasNext();)
-                {
-                    Object o = (Object) i.next();
-                    if (count != 0)
-                    {
-                        answer.append(", ");
-                    }
-
-                    answer.append(o);
-                    count++;
-                }
-
-                answer.append(" )");
-
-                return answer.toString();
-            }
-
-            public String getExpressionSymbol()
-            {
-                if (not)
-                {
-                    return "NOT IN";
-                }
-                else
-                {
-                    return "IN";
-                }
-            }
-        };
+        return new InExpression(right, inList, not);
     }
 
-    abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression
+    abstract static class BooleanUnaryExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
     {
-        public BooleanUnaryExpression(Expression left)
+        public BooleanUnaryExpression(Expression<E> left)
         {
             super(left);
         }
 
-        public boolean matches(AMQMessage message) throws AMQException
+        public boolean matches(Filterable<E> message) throws E
         {
             Object object = evaluate(message);
 
@@ -171,26 +85,9 @@
     }
     ;
 
-    public static BooleanExpression createNOT(BooleanExpression left)
+    public static<E extends Exception> BooleanExpression<E> createNOT(BooleanExpression<E> left)
     {
-        return new BooleanUnaryExpression(left)
-        {
-            public Object evaluate(AMQMessage message) throws AMQException
-            {
-                Boolean lvalue = (Boolean) right.evaluate(message);
-                if (lvalue == null)
-                {
-                    return null;
-                }
-
-                return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
-            }
-
-            public String getExpressionSymbol()
-            {
-                return "NOT";
-            }
-        };
+        return new NotExpression(left);
     }
 
     public static BooleanExpression createXPath(final String xpath)
@@ -203,36 +100,9 @@
         return new XQueryExpression(xpath);
     }
 
-    public static BooleanExpression createBooleanCast(Expression left)
+    public static<E extends Exception> BooleanExpression createBooleanCast(Expression<E> left)
     {
-        return new BooleanUnaryExpression(left)
-        {
-            public Object evaluate(AMQMessage message) throws AMQException
-            {
-                Object rvalue = right.evaluate(message);
-                if (rvalue == null)
-                {
-                    return null;
-                }
-
-                if (!rvalue.getClass().equals(Boolean.class))
-                {
-                    return Boolean.FALSE;
-                }
-
-                return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
-            }
-
-            public String toString()
-            {
-                return right.toString();
-            }
-
-            public String getExpressionSymbol()
-            {
-                return "";
-            }
-        };
+        return new BooleanCastExpression(left);
     }
 
     private static Number negate(Number left)
@@ -281,7 +151,7 @@
         this.right = left;
     }
 
-    public Expression getRight()
+    public Expression<E> getRight()
     {
         return right;
     }
@@ -334,4 +204,166 @@
      */
     public abstract String getExpressionSymbol();
 
+    private static class NegativeExpression<E extends Exception> extends UnaryExpression<E>
+    {
+        public NegativeExpression(final Expression<E> left)
+        {
+            super(left);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            Object rvalue = right.evaluate(message);
+            if (rvalue == null)
+            {
+                return null;
+            }
+
+            if (rvalue instanceof Number)
+            {
+                return negate((Number) rvalue);
+            }
+
+            return null;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "-";
+        }
+    }
+
+    private static class InExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    {
+        private final Collection _inList;
+        private final boolean _not;
+
+        public InExpression(final PropertyExpression<E> right, final Collection inList, final boolean not)
+        {
+            super(right);
+            _inList = inList;
+            _not = not;
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            Object rvalue = right.evaluate(message);
+            if (rvalue == null)
+            {
+                return null;
+            }
+
+            if (rvalue.getClass() != String.class)
+            {
+                return null;
+            }
+
+            if (((_inList != null) && _inList.contains(rvalue)) ^ _not)
+            {
+                return Boolean.TRUE;
+            }
+            else
+            {
+                return Boolean.FALSE;
+            }
+
+        }
+
+        public String toString()
+        {
+            StringBuffer answer = new StringBuffer();
+            answer.append(right);
+            answer.append(" ");
+            answer.append(getExpressionSymbol());
+            answer.append(" ( ");
+
+            int count = 0;
+            for (Iterator i = _inList.iterator(); i.hasNext();)
+            {
+                Object o = (Object) i.next();
+                if (count != 0)
+                {
+                    answer.append(", ");
+                }
+
+                answer.append(o);
+                count++;
+            }
+
+            answer.append(" )");
+
+            return answer.toString();
+        }
+
+        public String getExpressionSymbol()
+        {
+            if (_not)
+            {
+                return "NOT IN";
+            }
+            else
+            {
+                return "IN";
+            }
+        }
+    }
+
+    private static class NotExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    {
+        public NotExpression(final BooleanExpression<E> left)
+        {
+            super(left);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            Boolean lvalue = (Boolean) right.evaluate(message);
+            if (lvalue == null)
+            {
+                return null;
+            }
+
+            return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "NOT";
+        }
+    }
+
+    private static class BooleanCastExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    {
+        public BooleanCastExpression(final Expression<E> left)
+        {
+            super(left);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            Object rvalue = right.evaluate(message);
+            if (rvalue == null)
+            {
+                return null;
+            }
+
+            if (!rvalue.getClass().equals(Boolean.class))
+            {
+                return Boolean.FALSE;
+            }
+
+            return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
+        }
+
+        public String toString()
+        {
+            return right.toString();
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "";
+        }
+    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Thu Aug 14 20:40:49 2008
@@ -23,6 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -70,7 +71,7 @@
     private final XPathEvaluator evaluator;
 
     static public interface XPathEvaluator {
-        public boolean evaluate(AMQMessage message) throws AMQException;
+        public boolean evaluate(Filterable message) throws AMQException;
     }
 
     XPathExpression(String xpath) {
@@ -92,7 +93,7 @@
         }
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException {
+    public Object evaluate(Filterable message) throws AMQException {
 //        try {
 //FIXME this is flow to disk work
 //            if( message.isDropped() )
@@ -117,7 +118,7 @@
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws AMQException
      */
-    public boolean matches(AMQMessage message) throws AMQException
+    public boolean matches(Filterable message) throws AMQException
     {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Thu Aug 14 20:40:49 2008
@@ -19,6 +19,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 //
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
@@ -35,7 +36,7 @@
         this.xpath = xpath;
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException {
+    public Object evaluate(Filterable message) throws AMQException {
         return Boolean.FALSE;
     }
 
@@ -48,7 +49,7 @@
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws AMQException
      */
-    public boolean matches(AMQMessage message) throws AMQException
+    public boolean matches(Filterable message) throws AMQException
     {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Thu Aug 14 20:40:49 2008
@@ -29,6 +29,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 import org.apache.xpath.CachedXPathAPI;
 import org.w3c.dom.Document;
 import org.w3c.dom.traversal.NodeIterator;
@@ -42,7 +43,7 @@
         this.xpath = xpath;
     }
     
-    public boolean evaluate(AMQMessage m) throws AMQException
+    public boolean evaluate(Filterable m) throws AMQException
     {
         // TODO - we would have to check the content type and then evaluate the content
         //        here... is this really a feature we wish to implement? - RobG

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java Thu Aug 14 20:40:49 2008
@@ -1,65 +1,40 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.handler;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.AMQException;
-
-/**
- * @author Apache Software Foundation
- *
- *
- */
-public class AccessRequestHandler implements StateAwareMethodListener<AccessRequestBody>
-{
-    private static final AccessRequestHandler _instance = new AccessRequestHandler();
-
-
-    public static AccessRequestHandler getInstance()
-    {
-        return _instance;
-    }
-
-    private AccessRequestHandler()
-    {
-    }
-
-    public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
-    {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-
-        MethodRegistry methodRegistry = session.getMethodRegistry();
-
-        // We don't implement access control class, but to keep clients happy that expect it
-        // always use the "0" ticket.
-        AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
-
-        session.writeFrame(response.generateFrame(channelId));
-    }
-}
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+/**
+ * @author Apache Software Foundation
+ *
+ *
+ */
+public class AccessRequestHandler implements StateAwareMethodListener<AccessRequestBody>
+{
+    private static final AccessRequestHandler _instance = new AccessRequestHandler();
+
+
+    public static AccessRequestHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private AccessRequestHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
+    {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+
+        MethodRegistry methodRegistry = session.getMethodRegistry();
+
+        // We don't implement access control class, but to keep clients happy that expect it
+        // always use the "0" ticket.
+        AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+
+        session.writeFrame(response.generateFrame(channelId));
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -21,11 +21,9 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicCancelBody;
 import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -65,7 +63,7 @@
                        " nowait:" + body.getNowait());
         }
 
-        channel.unsubscribeConsumer(session, body.getConsumerTag());
+        channel.unsubscribeConsumer(body.getConsumerTag());
         if (!body.getNowait())
         {
             MethodRegistry methodRegistry = session.getMethodRegistry();

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -111,7 +111,7 @@
 
                 try
                 {
-                    AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, session, !body.getNoAck(),
+                    AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
                                                                           body.getArguments(), body.getNoLocal(), body.getExclusive());
                     if (!body.getNowait())
                     {
@@ -121,8 +121,7 @@
 
                     }
 
-                    //now allow queue to start async processing of any backlog of messages
-                    queue.deliverAsync();
+                    
                 }
                 catch (org.apache.qpid.AMQInvalidArgumentException ise)
                 {