You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/06/01 21:24:36 UTC

svn commit: r1488561 [1/3] - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/transport/ broker/src/test/java/org/apache/qpi...

Author: rgodfrey
Date: Sat Jun  1 19:24:36 2013
New Revision: 1488561

URL: http://svn.apache.org/r1488561
Log:
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java   (with props)
Removed:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Sat Jun  1 19:24:36 2013
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -173,20 +174,106 @@ public abstract class AbstractExchange i
         return getVirtualHost().getQueueRegistry();
     }
 
-    public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+    public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue)
     {
-        return isBound(new AMQShortString(bindingKey), queue);
+        return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue);
     }
 
+    public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+    {
+        for(Binding b : _bindings)
+        {
+            if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue())
+            {
+                return (b.getArguments() == null || b.getArguments().isEmpty())
+                       ? (arguments == null || arguments.isEmpty())
+                       : b.getArguments().equals(arguments);
+            }
+        }
+        return false;
+    }
+
+    public final boolean isBound(AMQShortString routingKey, AMQQueue queue)
+    {
+        return isBound(routingKey==null ? "" : routingKey.asString(), queue);
+    }
+
+    public final boolean isBound(String bindingKey, AMQQueue queue)
+    {
+        for(Binding b : _bindings)
+        {
+            if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public final boolean isBound(AMQShortString routingKey)
+    {
+        return isBound(routingKey == null ? "" : routingKey.asString());
+    }
+
+    public final boolean isBound(String bindingKey)
+    {
+        for(Binding b : _bindings)
+        {
+            if(bindingKey.equals(b.getBindingKey()))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public final boolean isBound(AMQQueue queue)
+    {
+        for(Binding b : _bindings)
+        {
+            if(queue == b.getQueue())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
 
-    public boolean isBound(String bindingKey, AMQQueue queue)
+    @Override
+    public final boolean isBound(Map<String, Object> arguments, AMQQueue queue)
     {
-        return isBound(new AMQShortString(bindingKey), queue);
+        for(Binding b : _bindings)
+        {
+            if(queue == b.getQueue() &&
+               ((b.getArguments() == null || b.getArguments().isEmpty())
+                       ? (arguments == null || arguments.isEmpty())
+                       : b.getArguments().equals(arguments)))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public final boolean isBound(String bindingKey, Map<String, Object> arguments)
+    {
+        for(Binding b : _bindings)
+        {
+            if(b.getBindingKey().equals(bindingKey) &&
+               ((b.getArguments() == null || b.getArguments().isEmpty())
+                       ? (arguments == null || arguments.isEmpty())
+                       : b.getArguments().equals(arguments)))
+            {
+                return true;
+            }
+        }
+        return false;
     }
 
-    public boolean isBound(String bindingKey)
+    public final boolean hasBindings()
     {
-        return isBound(new AMQShortString(bindingKey));
+        return !_bindings.isEmpty();
     }
 
     public Exchange getAlternateExchange()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Sat Jun  1 19:24:36 2013
@@ -272,6 +272,18 @@ public class DefaultExchange implements 
     }
 
     @Override
+    public boolean isBound(Map<String, Object> arguments, AMQQueue queue)
+    {
+        return (arguments == null || arguments.isEmpty()) && isBound(queue);
+    }
+
+    @Override
+    public boolean isBound(String bindingKey, Map<String, Object> arguments)
+    {
+        return (arguments == null || arguments.isEmpty()) && isBound(bindingKey);
+    }
+
+    @Override
     public boolean isBound(String bindingKey)
     {
         return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Sat Jun  1 19:24:36 2013
@@ -20,9 +20,18 @@
  */
 package org.apache.qpid.server.exchange;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -36,10 +45,14 @@ import java.util.concurrent.CopyOnWriteA
 
 public class DirectExchange extends AbstractExchange
 {
+
+    private static final Logger _logger = Logger.getLogger(DirectExchange.class);
+
     private static final class BindingSet
     {
         private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>();
-        private List<BaseQueue> _queues = new ArrayList<BaseQueue>();
+        private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>();
+        private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>();
 
         public synchronized void addBinding(Binding binding)
         {
@@ -56,27 +69,59 @@ public class DirectExchange extends Abst
         private void recalculateQueues()
         {
             List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
+            Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>();
 
             for(Binding b : _bindings)
             {
-                if(!queues.contains(b.getQueue()))
+
+                if(FilterSupport.argumentsContainFilter(b.getArguments()))
                 {
-                    queues.add(b.getQueue());
+                    try
+                    {
+                        MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getQueue());
+                        filteredQueues.put(b.getQueue(),filter);
+                    }
+                    catch (AMQInvalidArgumentException e)
+                    {
+                        _logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getQueue().getName()
+                                     + "' to exchange '" + b.getExchange().getName()
+                                     + "' with arguments: " + b.getArguments(), e);
+                    }
+
+                }
+                else
+                {
+
+                    if(!queues.contains(b.getQueue()))
+                    {
+                        queues.add(b.getQueue());
+                    }
                 }
             }
-            _queues = queues;
+            _unfilteredQueues = queues;
+            _filteredQueues = filteredQueues;
         }
 
 
-        public List<BaseQueue> getQueues()
+        public List<BaseQueue> getUnfilteredQueues()
         {
-            return _queues;
+            return _unfilteredQueues;
         }
 
         public CopyOnWriteArraySet<Binding> getBindings()
         {
             return _bindings;
         }
+
+        public boolean hasFilteredQueues()
+        {
+            return !_filteredQueues.isEmpty();
+        }
+
+        public Map<BaseQueue,MessageFilter> getFilteredQueues()
+        {
+            return _filteredQueues;
+        }
     }
 
     private final ConcurrentHashMap<String, BindingSet> _bindingsByKey =
@@ -98,7 +143,30 @@ public class DirectExchange extends Abst
 
         if(bindings != null)
         {
-            return bindings.getQueues();
+            List<BaseQueue> queues = bindings.getUnfilteredQueues();
+
+            if(bindings.hasFilteredQueues())
+            {
+                Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
+
+                Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues();
+                for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet())
+                {
+                    if(!queuesSet.contains(entry.getKey()))
+                    {
+                        MessageFilter filter = entry.getValue();
+                        if(filter.matches(payload))
+                        {
+                            queuesSet.add(entry.getKey());
+                        }
+                    }
+                }
+                if(queues.size() != queuesSet.size())
+                {
+                    queues = new ArrayList<BaseQueue>(queuesSet);
+                }
+            }
+            return queues;
         }
         else
         {
@@ -106,50 +174,6 @@ public class DirectExchange extends Abst
         }
 
 
-
-    }
-
-    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
-    {
-        return isBound(routingKey,queue);
-    }
-
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
-    {
-        String bindingKey = (routingKey == null) ? "" : routingKey.toString();
-        BindingSet bindings = _bindingsByKey.get(bindingKey);
-        if(bindings != null)
-        {
-            return bindings.getQueues().contains(queue);
-        }
-        return false;
-
-    }
-
-    public boolean isBound(AMQShortString routingKey)
-    {
-        String bindingKey = (routingKey == null) ? "" : routingKey.toString();
-        BindingSet bindings = _bindingsByKey.get(bindingKey);
-        return bindings != null && !bindings.getQueues().isEmpty();
-    }
-
-    public boolean isBound(AMQQueue queue)
-    {
-
-        for (BindingSet bindings : _bindingsByKey.values())
-        {
-            if(bindings.getQueues().contains(queue))
-            {
-                return true;
-            }
-
-        }
-        return false;
-    }
-
-    public boolean hasBindings()
-    {
-        return !getBindings().isEmpty();
     }
 
     protected void onBind(final Binding binding)
@@ -189,5 +213,4 @@ public class DirectExchange extends Abst
 
     }
 
-
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Sat Jun  1 19:24:36 2013
@@ -145,12 +145,15 @@ public interface Exchange extends Exchan
 
     Collection<Binding> getBindings();
 
+    boolean isBound(String bindingKey);
 
     boolean isBound(String bindingKey, AMQQueue queue);
 
-    public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
+    boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
 
-    boolean isBound(String bindingKey);
+    boolean isBound(Map<String, Object> arguments, AMQQueue queue);
+
+    boolean isBound(String bindingKey, Map<String, Object> arguments);
 
     void removeReference(ExchangeReferrer exchange);
 
@@ -158,6 +161,8 @@ public interface Exchange extends Exchan
 
     boolean hasReferrers();
 
+
+
     public interface BindingListener
     {
         void bindingAdded(Exchange exchange, Binding binding);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Sat Jun  1 19:24:36 2013
@@ -20,11 +20,21 @@
  */
 package org.apache.qpid.server.exchange;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -42,7 +52,18 @@ public class FanoutExchange extends Abst
     /**
      * Maps from queue name to queue instances
      */
-    private final ConcurrentHashMap<AMQQueue,Integer> _queues = new ConcurrentHashMap<AMQQueue,Integer>();
+    private final Map<AMQQueue,Integer> _queues = new HashMap<AMQQueue,Integer>();
+    private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>();
+    private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>();
+
+    private final AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>  _filteredBindings =
+            new AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>();
+    {
+        Map<AMQQueue,Map<Binding, MessageFilter>> emptyMap = Collections.emptyMap();
+        _filteredBindings.set(emptyMap);
+    }
+
+
 
     public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType();
 
@@ -54,115 +75,150 @@ public class FanoutExchange extends Abst
     public ArrayList<BaseQueue> doRoute(InboundMessage payload)
     {
 
-
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Publishing message to queue " + _queues);
-        }
-
         for(Binding b : getBindings())
         {
             b.incrementMatches();
         }
 
-        return new ArrayList<BaseQueue>(_queues.keySet());
-
-    }
+        final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
 
-    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
-    {
-        return isBound(routingKey, queue);
-    }
 
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
-    {
-        return isBound(queue);
-    }
+        final Map<AMQQueue, Map<Binding, MessageFilter>> filteredBindings = _filteredBindings.get();
+        if(!_filteredQueues.isEmpty())
+        {
+            for(AMQQueue q : _filteredQueues)
+            {
+                final Map<Binding, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
+                if(!(bindingMessageFilterMap == null || result.contains(q)))
+                {
+                    for(MessageFilter filter : bindingMessageFilterMap.values())
+                    {
+                        if(filter.matches(payload))
+                        {
+                            result.add(q);
+                            break;
+                        }
+                    }
+                }
+            }
 
-    public boolean isBound(AMQShortString routingKey)
-    {
+        }
 
-        return (_queues != null) && !_queues.isEmpty();
-    }
 
-    public boolean isBound(AMQQueue queue)
-    {
-        if (queue == null)
+        if (_logger.isDebugEnabled())
         {
-            return false;
+            _logger.debug("Publishing message to queue " + result);
         }
-        return _queues.containsKey(queue);
-    }
 
-    public boolean hasBindings()
-    {
-        return !_queues.isEmpty();
+        return result;
+
     }
 
-    protected void onBind(final Binding binding)
+
+    protected synchronized void onBind(final Binding binding)
     {
         AMQQueue queue = binding.getQueue();
         assert queue != null;
+        if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
+        {
 
-        Integer oldVal;
+            Integer oldVal;
+            if(_queues.containsKey(queue))
+            {
+                _queues.put(queue,_queues.get(queue)+1);
+            }
+            else
+            {
+                _queues.put(queue, ONE);
+                _unfilteredQueues.add(queue);
+                // No longer any reason to check filters for this queue
+                _filteredQueues.remove(queue);
+            }
 
-        if((oldVal = _queues.putIfAbsent(queue, ONE)) != null)
+        }
+        else
         {
-            Integer newVal = oldVal+1;
-            while(!_queues.replace(queue, oldVal, newVal))
+            try
             {
-                oldVal = _queues.get(queue);
-                if(oldVal == null)
+
+                HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
+                        new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+
+                Map<Binding, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue());
+                final
+                MessageFilter messageFilter =
+                        FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue());
+
+                if(bindingsForQueue != null)
                 {
-                    oldVal = _queues.putIfAbsent(queue, ONE);
-                    if(oldVal == null)
+                    bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+                    bindingsForQueue.put(binding, messageFilter);
+                }
+                else
+                {
+                    bindingsForQueue = Collections.singletonMap(binding, messageFilter);
+                    if(!_unfilteredQueues.contains(queue))
                     {
-                        break;
+                        _filteredQueues.add(queue);
                     }
                 }
-                newVal = oldVal + 1;
+
+                filteredBindings.put(binding.getQueue(), bindingsForQueue);
+
+                _filteredBindings.set(filteredBindings);
+
+            }
+            catch (AMQInvalidArgumentException e)
+            {
+                _logger.warn("Cannoy bind queue " + queue + " to exchange this " + this + " beacuse selector cannot be parsed.", e);
+                return;
             }
         }
-
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Binding queue " + queue
-                          + " with routing key " + new AMQShortString(binding.getBindingKey()) + " to exchange " + this);
+                          + " with routing key " + binding.getBindingKey() + " to exchange " + this);
         }
     }
 
-    protected void onUnbind(final Binding binding)
+    protected synchronized void onUnbind(final Binding binding)
     {
         AMQQueue queue = binding.getQueue();
-        Integer oldValue = _queues.get(queue);
-
-        boolean done = false;
-
-        while(!(done || oldValue == null))
+        if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
         {
-            while(!(done || oldValue == null) && oldValue.intValue() == 1)
+            Integer oldValue = _queues.remove(queue);
+            if(ONE.equals(oldValue))
             {
-                if(!_queues.remove(queue, oldValue))
+                // should start checking filters for this queue
+                if(_filteredBindings.get().containsKey(queue))
                 {
-                    oldValue = _queues.get(queue);
-                }
-                else
-                {
-                    done = true;
+                    _filteredQueues.add(queue);
                 }
+                _unfilteredQueues.remove(queue);
             }
-            while(!(done || oldValue == null) && oldValue.intValue() != 1)
+            else
             {
-                Integer newValue = oldValue - 1;
-                if(!_queues.replace(queue, oldValue, newValue))
-                {
-                    oldValue = _queues.get(queue);
-                }
-                else
-                {
-                    done = true;
-                }
+                _queues.put(queue,oldValue-1);
             }
         }
+        else // we are removing a binding with filters
+        {
+            HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
+                    new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+
+            Map<Binding,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue());
+            if(bindingsForQueue.size()>1)
+            {
+                bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+                bindingsForQueue.remove(binding);
+                filteredBindings.put(binding.getQueue(),bindingsForQueue);
+            }
+            else
+            {
+                _filteredQueues.remove(queue);
+            }
+            _filteredBindings.set(filteredBindings);
+
+        }
     }
 }

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java?rev=1488561&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java Sat Jun  1 19:24:36 2013
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.exchange;
+
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.Filterable;
+
+public class FilterSupport
+{
+    private static final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache =
+            Collections.synchronizedMap(new WeakHashMap<String, WeakReference<JMSSelectorFilter>>());
+
+    static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
+    {
+        final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+        return getMessageFilter(selectorString);
+    }
+
+
+    static MessageFilter createJMSSelectorFilter(Map<String, Object> args) throws AMQInvalidArgumentException
+    {
+        final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.getValue());
+        return getMessageFilter(selectorString);
+    }
+
+
+    private static MessageFilter getMessageFilter(String selectorString) throws AMQInvalidArgumentException
+    {
+        WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
+        JMSSelectorFilter selector = null;
+
+        if(selectorRef == null || (selector = selectorRef.get())==null)
+        {
+            try
+            {
+                selector = new JMSSelectorFilter(selectorString);
+            }
+            catch (ParseException e)
+            {
+                throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+            }
+            catch (SelectorParsingException e)
+            {
+                throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+            }
+            catch (TokenMgrError e)
+            {
+                throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+            }
+            _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
+        }
+        return selector;
+    }
+
+    static boolean argumentsContainFilter(final FieldTable args)
+    {
+        return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+    }
+
+
+    static boolean argumentsContainFilter(final Map<String, Object> args)
+    {
+        return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+    }
+
+
+    static boolean argumentsContainNoLocal(final Map<String, Object> args)
+    {
+        return args != null
+                && args.containsKey(AMQPFilterTypes.NO_LOCAL.toString())
+                && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString()));
+    }
+
+
+    static boolean argumentsContainNoLocal(final FieldTable args)
+    {
+        return args != null
+                && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
+                && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
+    }
+
+
+    static boolean argumentsContainJMSSelector(final Map<String,Object> args)
+    {
+        return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String)
+                       && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
+    }
+
+
+    static boolean argumentsContainJMSSelector(final FieldTable args)
+    {
+        return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
+                       && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
+    }
+
+
+    static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+    {
+        if(argumentsContainNoLocal(args))
+        {
+            MessageFilter filter = new NoLocalFilter(queue);
+
+            if(argumentsContainJMSSelector(args))
+            {
+                filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+            }
+            return filter;
+        }
+        else
+        {
+            return createJMSSelectorFilter(args);
+        }
+    }
+
+    static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
+    {
+        if(argumentsContainNoLocal(args))
+        {
+            MessageFilter filter = new NoLocalFilter(queue);
+
+            if(argumentsContainJMSSelector(args))
+            {
+                filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+            }
+            return filter;
+        }
+        else
+        {
+            return createJMSSelectorFilter(args);
+        }
+    }
+
+    static final class NoLocalFilter implements MessageFilter
+    {
+        private final AMQQueue _queue;
+
+        public NoLocalFilter(AMQQueue queue)
+        {
+            _queue = queue;
+        }
+
+        public boolean matches(Filterable message)
+        {
+            InboundMessage inbound = (InboundMessage) message;
+            final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
+            return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
+
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            NoLocalFilter that = (NoLocalFilter) o;
+
+            return _queue == null ? that._queue == null : _queue.equals(that._queue);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return _queue != null ? _queue.hashCode() : 0;
+        }
+    }
+
+    static final class CompoundFilter implements MessageFilter
+    {
+        private MessageFilter _noLocalFilter;
+        private MessageFilter _jmsSelectorFilter;
+
+        public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
+        {
+            _noLocalFilter = filter;
+            _jmsSelectorFilter = jmsSelectorFilter;
+        }
+
+        public boolean matches(Filterable message)
+        {
+            return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            CompoundFilter that = (CompoundFilter) o;
+
+            if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+            {
+                return false;
+            }
+            if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+            {
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
+            result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
+            return result;
+        }
+    }
+}

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Sat Jun  1 19:24:36 2013
@@ -22,15 +22,19 @@ package org.apache.qpid.server.exchange;
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.framing.AMQTypedValue;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.AMQMessageHeader;
 
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * Defines binding and matching based on a set of headers.
@@ -44,13 +48,14 @@ class HeadersBinding
     private final Set<String> required = new HashSet<String>();
     private final Map<String,Object> matches = new HashMap<String,Object>();
     private boolean matchAny;
+    private MessageFilter _filter;
 
     /**
      * Creates a header binding for a set of mappings. Those mappings whose value is
      * null or the empty string are assumed only to be required headers, with
      * no constraint on the value. Those with a non-null value are assumed to
      * define a required match of value.
-     * 
+     *
      * @param binding the binding to create a header binding using
      */
     public HeadersBinding(Binding binding)
@@ -66,9 +71,30 @@ class HeadersBinding
             _mappings = null;
         }
     }
-    
+
     private void initMappings()
     {
+        if(FilterSupport.argumentsContainFilter(_mappings))
+        {
+            try
+            {
+                _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue());
+            }
+            catch (AMQInvalidArgumentException e)
+            {
+                _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName()
+                             +"' to exchange '"+_binding.getExchange().getName()
+                             +"' with arguments: " + _binding.getArguments());
+                _filter = new MessageFilter()
+                    {
+                        @Override
+                        public boolean matches(Filterable message)
+                        {
+                            return false;
+                        }
+                    };
+            }
+        }
         for(Map.Entry<String, Object> entry : _mappings.entrySet())
         {
             String propertyName = entry.getKey();
@@ -87,7 +113,7 @@ class HeadersBinding
             }
         }
     }
-    
+
     public Binding getBinding()
     {
         return _binding;
@@ -111,6 +137,11 @@ class HeadersBinding
         }
     }
 
+    public boolean matches(InboundMessage message)
+    {
+        return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
+    }
+
     private boolean and(AMQMessageHeader headers)
     {
         if(headers.containsHeaders(required))
@@ -215,7 +246,7 @@ class HeadersBinding
     {
         return key.startsWith("X-") || key.startsWith("x-");
     }
-    
+
     @Override
     public boolean equals(final Object o)
     {
@@ -250,4 +281,4 @@ class HeadersBinding
 
         return true;
     }
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Sat Jun  1 19:24:36 2013
@@ -69,14 +69,14 @@ public class HeadersExchange extends Abs
 {
 
     private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
-    
+
     private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
                             new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
-    
+
     private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
                             new CopyOnWriteArrayList<HeadersBinding>();
 
-    
+
     public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType();
 
     public HeadersExchange()
@@ -87,112 +87,31 @@ public class HeadersExchange extends Abs
 
     public ArrayList<BaseQueue> doRoute(InboundMessage payload)
     {
-        AMQMessageHeader header = payload.getMessageHeader();
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header);
+            _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader());
         }
-        
+
         LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
-        
+
         for (HeadersBinding hb : _bindingHeaderMatchers)
         {
-            if (hb.matches(header))
+            if (hb.matches(payload))
             {
                 Binding b = hb.getBinding();
-                
+
                 b.incrementMatches();
-                
+
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " +
-                                  header + " to " + b.getQueue().getNameShortString());
+                                  payload.getMessageHeader() + " to " + b.getQueue().getNameShortString());
                 }
                 queues.add(b.getQueue());
             }
         }
-        
-        return new ArrayList<BaseQueue>(queues);
-    }
-
-
-    public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
-    {
-        CopyOnWriteArraySet<Binding> bindings;
-        if(bindingKey == null)
-        {
-            bindings = new CopyOnWriteArraySet<Binding>(getBindings());
-        }
-        else
-        {
-            bindings = _bindingsByKey.get(bindingKey);
-        }
-
-        if(bindings != null)
-        {
-            for(Binding binding : bindings)
-            {
-                if(queue == null || binding.getQueue().equals(queue))
-                {
-                    return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments);
-                }
-            }
-        }
-
-        return false;
-    }
-
-    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
-    {
-        //fixme isBound here should take the arguements in to consideration.
-        return isBound(routingKey, queue);
-    }
-
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
-    {
-        String bindingKey = (routingKey == null) ? "" : routingKey.toString();
-        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
-        
-        if(bindings != null)
-        {
-            for(Binding binding : bindings)
-            {
-                if(binding.getQueue().equals(queue))
-                {
-                    return true;
-                }
-            }
-        }
-        
-        return false;
-    }
-
-    public boolean isBound(AMQShortString routingKey)
-    {
-        String bindingKey = (routingKey == null) ? "" : routingKey.toString();
-        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
-        return bindings != null && !bindings.isEmpty();
-    }
-
-    public boolean isBound(AMQQueue queue)
-    {
-        for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
-        {
-            for(Binding binding : bindings)
-            {
-                if(binding.getQueue().equals(queue))
-                {
-                    return true;
-                }
-            }
-        }
-        
-        return false;
-    }
 
-    public boolean hasBindings()
-    {
-        return !getBindings().isEmpty();
+        return new ArrayList<BaseQueue>(queues);
     }
 
     protected void onBind(final Binding binding)
@@ -216,7 +135,7 @@ public class HeadersExchange extends Abs
                 bindings = newBindings;
             }
         }
-        
+
         if(_logger.isDebugEnabled())
         {
             _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() +

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Sat Jun  1 19:24:36 2013
@@ -20,21 +20,15 @@
  */
 package org.apache.qpid.server.exchange;
 
-import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.filter.SelectorParsingException;
-import org.apache.qpid.filter.selector.ParseException;
-import org.apache.qpid.filter.selector.TokenMgrError;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
@@ -42,14 +36,10 @@ import org.apache.qpid.server.exchange.t
 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
 import org.apache.qpid.server.exchange.topic.TopicNormalizer;
 import org.apache.qpid.server.exchange.topic.TopicParser;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.Filterable;
 
 public class TopicExchange extends AbstractExchange
 {
@@ -65,8 +55,6 @@ public class TopicExchange extends Abstr
 
     private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
 
-    private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
-
     public TopicExchange()
     {
         super(TYPE);
@@ -77,7 +65,7 @@ public class TopicExchange extends Abstr
         AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
         AMQQueue queue = binding.getQueue();
         FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
-        
+
         assert queue != null;
         assert rKey != null;
 
@@ -91,26 +79,26 @@ public class TopicExchange extends Abstr
             FieldTable oldArgs = _bindings.get(binding);
             TopicExchangeResult result = _topicExchangeResults.get(routingKey);
 
-            if(argumentsContainFilter(args))
+            if(FilterSupport.argumentsContainFilter(args))
             {
-                if(argumentsContainFilter(oldArgs))
+                if(FilterSupport.argumentsContainFilter(oldArgs))
                 {
                     result.replaceQueueFilter(queue,
-                                              createMessageFilter(oldArgs, queue),
-                                              createMessageFilter(args, queue));
+                                              FilterSupport.createMessageFilter(oldArgs, queue),
+                                              FilterSupport.createMessageFilter(args, queue));
                 }
                 else
                 {
-                    result.addFilteredQueue(queue, createMessageFilter(args, queue));
+                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
                     result.removeUnfilteredQueue(queue);
                 }
             }
             else
             {
-                if(argumentsContainFilter(oldArgs))
+                if(FilterSupport.argumentsContainFilter(oldArgs))
                 {
                     result.addUnfilteredQueue(queue);
-                    result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
+                    result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue));
                 }
                 else
                 {
@@ -118,7 +106,7 @@ public class TopicExchange extends Abstr
                     return;
                 }
             }
-            
+
             result.addBinding(binding);
 
         }
@@ -129,9 +117,9 @@ public class TopicExchange extends Abstr
             if(result == null)
             {
                 result = new TopicExchangeResult();
-                if(argumentsContainFilter(args))
+                if(FilterSupport.argumentsContainFilter(args))
                 {
-                    result.addFilteredQueue(queue, createMessageFilter(args, queue));
+                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
                 }
                 else
                 {
@@ -142,89 +130,22 @@ public class TopicExchange extends Abstr
             }
             else
             {
-                if(argumentsContainFilter(args))
+                if(FilterSupport.argumentsContainFilter(args))
                 {
-                    result.addFilteredQueue(queue, createMessageFilter(args, queue));
+                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
                 }
                 else
                 {
                     result.addUnfilteredQueue(queue);
                 }
             }
-            
+
             result.addBinding(binding);
             _bindings.put(binding, args);
         }
 
     }
 
-    private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
-    {
-        if(argumentsContainNoLocal(args))
-        {
-            MessageFilter filter = new NoLocalFilter(queue);
-
-            if(argumentsContainJMSSelector(args))
-            {
-                filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
-            }
-            return filter;
-        }
-        else
-        {
-            return createJMSSelectorFilter(args);
-        }
-
-    }
-
-
-    private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
-    {
-        final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
-        WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
-        JMSSelectorFilter selector = null;
-
-        if(selectorRef == null || (selector = selectorRef.get())==null)
-        {
-            try
-            {
-                selector = new JMSSelectorFilter(selectorString);
-            }
-            catch (ParseException e)
-            {
-                throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
-            }
-            catch (SelectorParsingException e)
-            {
-                throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
-            }
-            catch (TokenMgrError e)
-            {
-                throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
-            }
-            _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
-        }
-        return selector;
-    }
-
-    private static boolean argumentsContainFilter(final FieldTable args)
-    {
-        return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
-    }
-
-    private static boolean argumentsContainNoLocal(final FieldTable args)
-    {
-        return args != null
-                && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
-                && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
-    }
-
-    private static boolean argumentsContainJMSSelector(final FieldTable args)
-    {
-        return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
-                       && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
-    }
-
 
     public ArrayList<BaseQueue> doRoute(InboundMessage payload)
     {
@@ -256,87 +177,6 @@ public class TopicExchange extends Abstr
 
     }
 
-    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
-    {
-        Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
-        
-        if (arguments == null)
-        {
-            return _bindings.containsKey(binding);
-        }
-        else
-        {
-            FieldTable o = _bindings.get(binding);
-            if (o != null)
-            {
-                return o.equals(arguments);
-            }
-            else
-            {
-                return false;
-            }
-
-        }
-    }
-
-    public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
-    {
-        Binding binding = new Binding(null, bindingKey, queue, this, arguments);
-        if (arguments == null)
-        {
-            return _bindings.containsKey(binding);
-        }
-        else
-        {
-            FieldTable o = _bindings.get(binding);
-            if (o != null)
-            {
-                return arguments.equals(FieldTable.convertToMap(o));
-            }
-            else
-            {
-                return false;
-            }
-        }
-
-    }
-
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
-    {
-        return isBound(routingKey, null, queue);
-    }
-
-    public boolean isBound(AMQShortString routingKey)
-    {
-        for(Binding b : _bindings.keySet())
-        {
-            if(b.getBindingKey().equals(routingKey.toString()))
-            {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    public boolean isBound(AMQQueue queue)
-    {
-        for(Binding b : _bindings.keySet())
-        {
-            if(b.getQueue().equals(queue))
-            {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    public boolean hasBindings()
-    {
-        return !_bindings.isEmpty();
-    }
-
     private boolean deregisterQueue(final Binding binding)
     {
         if(_bindings.containsKey(binding))
@@ -344,14 +184,15 @@ public class TopicExchange extends Abstr
             FieldTable bindingArgs = _bindings.remove(binding);
             AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
             TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
-            
+
             result.removeBinding(binding);
-            
-            if(argumentsContainFilter(bindingArgs))
+
+            if(FilterSupport.argumentsContainFilter(bindingArgs))
             {
                 try
                 {
-                    result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
+                    result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs,
+                            binding.getQueue()));
                 }
                 catch (AMQInvalidArgumentException e)
                 {
@@ -418,96 +259,4 @@ public class TopicExchange extends Abstr
         deregisterQueue(binding);
     }
 
-    private static final class NoLocalFilter implements MessageFilter
-    {
-        private final AMQQueue _queue;
-
-        public NoLocalFilter(AMQQueue queue)
-        {
-            _queue = queue;
-        }
-
-        public boolean matches(Filterable message)
-        {
-            InboundMessage inbound = (InboundMessage) message;
-            final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
-            return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
-
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o)
-            {
-                return true;
-            }
-
-            if (o == null || getClass() != o.getClass())
-            {
-                return false;
-            }
-
-            NoLocalFilter that = (NoLocalFilter) o;
-
-            return _queue == null ? that._queue == null : _queue.equals(that._queue);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return _queue != null ? _queue.hashCode() : 0;
-        }
-    }
-
-    private static final class CompoundFilter implements MessageFilter
-    {
-        private MessageFilter _noLocalFilter;
-        private MessageFilter _jmsSelectorFilter;
-
-        public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
-        {
-            _noLocalFilter = filter;
-            _jmsSelectorFilter = jmsSelectorFilter;
-        }
-
-        public boolean matches(Filterable message)
-        {
-            return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o)
-            {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass())
-            {
-                return false;
-            }
-
-            CompoundFilter that = (CompoundFilter) o;
-
-            if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
-            {
-                return false;
-            }
-            if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
-            {
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override
-        public int hashCode()
-        {
-            int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
-            result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
-            return result;
-        }
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Sat Jun  1 19:24:36 2013
@@ -159,9 +159,15 @@ public class ExchangeBoundHandler implem
                 else
                 {
 
+                    String message = "Queue " + queueName + " not bound with routing key " +
+                                        body.getRoutingKey() + " to exchange " + exchangeName;
+
+                    if(message.length()>255)
+                    {
+                        message = message.substring(0,254);
+                    }
                     response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
-                        new AMQShortString("Queue " + queueName + " not bound with routing key " +
-                        body.getRoutingKey() + " to exchange " + exchangeName));	// replyText
+                        new AMQShortString(message));	// replyText
                 }
             }
         }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Sat Jun  1 19:24:36 2013
@@ -1130,22 +1130,22 @@ public class ServerSessionDelegate exten
 
                     if(queueMatched)
                     {
-                        result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
+                        final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue);
+                        result.setKeyNotMatched(!keyMatched);
+                        if(method.hasArguments() && keyMatched)
+                        {
+                            result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue));
+                        }
                     }
                     else
                     {
                         result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
                     }
 
-                    if(method.hasArguments())
-                    {
-                        result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null));
-                    }
-
                 }
                 else if (method.hasArguments())
                 {
-                    result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null));
+                    result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
                 }
 
             }
@@ -1166,7 +1166,7 @@ public class ServerSessionDelegate exten
         {
             if(method.hasArguments())
             {
-                result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null));
+                result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments()));
             }
             result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Sat Jun  1 19:24:36 2013
@@ -21,22 +21,32 @@
 package org.apache.qpid.server.exchange;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class FanoutExchangeTest extends TestCase
 {
@@ -51,7 +61,9 @@ public class FanoutExchangeTest extends 
         _virtualHost = mock(VirtualHost.class);
         SecurityManager securityManager = mock(SecurityManager.class);
         when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
-        when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true);
+        when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
+        when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
+
         _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
     }
 
@@ -76,14 +88,14 @@ public class FanoutExchangeTest extends 
     {
         AMQQueue queue = bindQueue();
         assertTrue("Should return true for a bound queue",
-                _exchange.isBound((AMQShortString) null, (FieldTable) null, queue));
+                _exchange.isBound(new AMQShortString("matters"), (FieldTable) null, queue));
     }
 
     public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException
     {
         AMQQueue queue = bindQueue();
         assertTrue("Should return true for a bound queue",
-                _exchange.isBound((AMQShortString) null, queue));
+                _exchange.isBound(new AMQShortString("matters"), queue));
     }
 
     public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException
@@ -95,9 +107,86 @@ public class FanoutExchangeTest extends 
 
     private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException
     {
+        AMQQueue queue = mockQueue();
+        _exchange.addBinding("matters", queue, null);
+        return queue;
+    }
+
+    private AMQQueue mockQueue()
+    {
         AMQQueue queue = mock(AMQQueue.class);
         when(queue.getVirtualHost()).thenReturn(_virtualHost);
-        _exchange.addBinding("does not matter", queue, null);
         return queue;
     }
+
+    public void testRoutingWithSelectors() throws Exception
+    {
+        AMQQueue queue1 = mockQueue();
+        AMQQueue queue2 = mockQueue();
+
+        _exchange.addBinding("key",queue1, null);
+        _exchange.addBinding("key",queue2, null);
+
+
+        List<? extends BaseQueue> result = _exchange.route(mockMessage(true));
+
+        assertEquals("Expected message to be routed to both queues", 2, result.size());
+        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+        assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+        _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
+
+
+        result = _exchange.route(mockMessage(true));
+
+        assertEquals("Expected message to be routed to both queues", 2, result.size());
+        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+        assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+        _exchange.removeBinding("key",queue2,null);
+
+        result = _exchange.route(mockMessage(true));
+
+        assertEquals("Expected message to be routed to both queues", 2, result.size());
+        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+        assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+        result = _exchange.route(mockMessage(false));
+
+        assertEquals("Expected message to be routed to queue1 only", 1, result.size());
+        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+        assertFalse("Expected queue2 not to be routed to", result.contains(queue2));
+
+        _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
+
+
+        result = _exchange.route(mockMessage(false));
+        assertEquals("Expected message to be routed to both queues", 2, result.size());
+        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+        assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+    }
+
+    private InboundMessage mockMessage(boolean val)
+    {
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        when(header.containsHeader("select")).thenReturn(true);
+        when(header.getHeader("select")).thenReturn(val);
+        when(header.getHeaderNames()).thenReturn(Collections.singleton("select"));
+        when(header.containsHeaders(anySet())).then(new Answer<Object>()
+        {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable
+            {
+                final Set names = (Set) invocation.getArguments()[0];
+                return names.size() == 1 && names.contains("select");
+
+            }
+        });
+        final InboundMessage inboundMessage = mock(InboundMessage.class);
+        when(inboundMessage.getMessageHeader()).thenReturn(header);
+        return inboundMessage;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Sat Jun  1 19:24:36 2013
@@ -20,106 +20,230 @@
  */
 package org.apache.qpid.server.exchange;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.util.BrokerTestHelper;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import junit.framework.TestCase;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
+public class HeadersExchangeTest extends TestCase
 {
-    private AMQProtocolSession _protocolSession;
+    private HeadersExchange _exchange;
+    private VirtualHost _virtualHost;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
-        BrokerTestHelper.setUp();
-        _protocolSession = new InternalTestProtocolSession(getVirtualHost(), BrokerTestHelper.createBrokerMock());
+
+        CurrentActor.setDefault(mock(LogActor.class));
+        _exchange = new HeadersExchange();
+        _virtualHost = mock(VirtualHost.class);
+        SecurityManager securityManager = mock(SecurityManager.class);
+        when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
+        when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
+        when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
+
+        _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
+
     }
 
-    @Override
-    public void tearDown() throws Exception
+    protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception
+    {
+        List<? extends BaseQueue> results = _exchange.route(msg);
+        List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
+        unexpected.removeAll(Arrays.asList(expected));
+        assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
+        List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected));
+        missing.removeAll(results);
+        assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty());
+        assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size());
+    }
+
+
+    private AMQQueue createAndBind(final String name, String... arguments)
+            throws Exception
+    {
+        return createAndBind(name, getArgsMapFromStrings(arguments));
+    }
+
+    private Map<String, Object> getArgsMapFromStrings(String... arguments)
+    {
+        Map<String, Object> map = new HashMap<String,Object>();
+
+        for(String arg : arguments)
+        {
+            if(arg.contains("="))
+            {
+                String[] keyValue = arg.split("=",2);
+                map.put(keyValue[0],keyValue[1]);
+            }
+            else
+            {
+                map.put(arg,null);
+            }
+        }
+        return map;
+    }
+
+    private AMQQueue createAndBind(final String name, Map<String, Object> arguments)
+            throws Exception
+    {
+        AMQQueue q = create(name);
+        bind(name, arguments, q);
+        return q;
+    }
+
+    private void bind(String bindingKey, Map<String, Object> arguments, AMQQueue q)
+            throws AMQSecurityException, AMQInternalException
+    {
+        _exchange.addBinding(bindingKey,q,arguments);
+    }
+
+    private AMQQueue create(String name)
+    {
+        AMQQueue q = mock(AMQQueue.class);
+        when(q.toString()).thenReturn(name);
+        when(q.getVirtualHost()).thenReturn(_virtualHost);
+        return q;
+    }
+
+
+    public void testSimple() throws Exception
+    {
+        AMQQueue q1 = createAndBind("Q1", "F0000");
+        AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark");
+        AMQQueue q3 = createAndBind("Q3", "F0001");
+        AMQQueue q4 = createAndBind("Q4", "F0001=Bear");
+        AMQQueue q5 = createAndBind("Q5", "F0000", "F0001");
+        AMQQueue q6 = createAndBind("Q6", "F0000=Aardvark", "F0001=Bear");
+        AMQQueue q7 = createAndBind("Q7", "F0000", "F0001=Bear");
+        AMQQueue q8 = createAndBind("Q8", "F0000=Aardvark", "F0001");
+
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")),
+                q1, q2, q3, q4, q5, q6, q7, q8);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+
+    }
+
+    public void testAny() throws Exception
+    {
+        AMQQueue q1 = createAndBind("Q1", "F0000", "F0001", "X-match=any");
+        AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark", "F0001=Bear", "X-match=any");
+        AMQQueue q3 = createAndBind("Q3", "F0000", "F0001=Bear", "X-match=any");
+        AMQQueue q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any");
+        AMQQueue q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any");
+
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+    }
+
+    public void testOnUnbind() throws Exception
+    {
+        AMQQueue q1 = createAndBind("Q1", "F0000");
+        AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark");
+        AMQQueue q3 = createAndBind("Q3", "F0001");
+
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3);
+
+        _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000"));
+
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")));
+        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
+    }
+
+
+    public void testWithSelectors() throws Exception
     {
-        BrokerTestHelper.tearDown();
-        super.tearDown();
+        AMQQueue q1 = create("Q1");
+        AMQQueue q2 = create("Q2");
+        bind("q1",getArgsMapFromStrings("F"), q1);
+        bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1);
+        bind("q2",getArgsMapFromStrings("F=1"), q2);
+
+        routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1);
+
+        routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2);
+
+
+        AMQQueue q3 = create("Q3");
+        bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3);
+        routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1);
+        bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3);
+
+        routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3);
+
     }
 
-    public void testSimple() throws AMQException
+    private InboundMessage mockMessage(final Map<String, Object> headerValues)
     {
-        TestQueue q1 = bindDefault("F0000");
-        TestQueue q2 = bindDefault("F0000=Aardvark");
-        TestQueue q3 = bindDefault("F0001");
-        TestQueue q4 = bindDefault("F0001=Bear");
-        TestQueue q5 = bindDefault("F0000", "F0001");
-        TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
-        TestQueue q7 = bindDefault("F0000", "F0001=Bear");
-        TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
-
-        routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
-        routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
-        routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
-        routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
-        routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"),
-                     q1, q2, q3, q4, q5, q6, q7, q8);
-        routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
-
-        Message m7 = new Message(_protocolSession, "Message7", "XXXXX");
-
-        MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
-        pb7.setMandatory(true);
-        routeAndTest(m7,true);
-
-        Message m8 = new Message(_protocolSession, "Message8", "F0000");
-        MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
-        pb8.setMandatory(true);
-        routeAndTest(m8,false,q1);
-
-
-    }
-
-    public void testAny() throws AMQException
-    {
-        TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
-        TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
-        TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
-        TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
-        TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
-
-        routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3);
-        routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4);
-        routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
-        routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
-        routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
-        routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
-    }
-
-    public void testMandatory() throws AMQException
-    {
-        bindDefault("F0000");
-        Message m1 = new Message(_protocolSession, "Message1", "XXXXX");
-        Message m2 = new Message(_protocolSession, "Message2", "F0000");
-        MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
-        pb1.setMandatory(true);
-        MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
-        pb2.setMandatory(true);
-        routeAndTest(m1,true);
-    }
-    
-    public void testOnUnbind() throws AMQException
-    {
-        TestQueue q1 = bindDefault("F0000");
-        TestQueue q2 = bindDefault("F0000=Aardvark");
-        TestQueue q3 = bindDefault("F0001");
-        
-        routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
-        routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
-        routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3);
-        
-        unbind(q1,"F0000");
-        routeAndTest(new Message(_protocolSession, "Message4", "F0000"));
-        routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2);
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        when(header.containsHeader(anyString())).then(new Answer<Boolean>()
+        {
+            @Override
+            public Boolean answer(InvocationOnMock invocation) throws Throwable
+            {
+                return headerValues.containsKey((String) invocation.getArguments()[0]);
+            }
+        });
+        when(header.getHeader(anyString())).then(new Answer<Object>()
+        {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable
+            {
+                return headerValues.get((String) invocation.getArguments()[0]);
+            }
+        });
+        when(header.getHeaderNames()).thenReturn(headerValues.keySet());
+        when(header.containsHeaders(anySet())).then(new Answer<Boolean>()
+        {
+            @Override
+            public Boolean answer(InvocationOnMock invocation) throws Throwable
+            {
+                final Set names = (Set) invocation.getArguments()[0];
+                return headerValues.keySet().containsAll(names);
+
+            }
+        });
+        final InboundMessage inboundMessage = mock(InboundMessage.class);
+        when(inboundMessage.getMessageHeader()).thenReturn(header);
+        return inboundMessage;
     }
-    
 
     public static junit.framework.Test suite()
     {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java Sat Jun  1 19:24:36 2013
@@ -31,7 +31,7 @@ public class AMQHeadersExchange extends 
 {
     public AMQHeadersExchange(BindingURL binding)
     {
-        this(binding.getExchangeName());
+        super(binding);
     }
 
     public AMQHeadersExchange(String name)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org