You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/06/01 21:24:36 UTC
svn commit: r1488561 [1/3] - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/transport/
broker/src/test/java/org/apache/qpi...
Author: rgodfrey
Date: Sat Jun 1 19:24:36 2013
New Revision: 1488561
URL: http://svn.apache.org/r1488561
Log:
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (with props)
Removed:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Sat Jun 1 19:24:36 2013
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -173,20 +174,106 @@ public abstract class AbstractExchange i
return getVirtualHost().getQueueRegistry();
}
- public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+ public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue)
{
- return isBound(new AMQShortString(bindingKey), queue);
+ return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue);
}
+ public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue())
+ {
+ return (b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments);
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQShortString routingKey, AMQQueue queue)
+ {
+ return isBound(routingKey==null ? "" : routingKey.asString(), queue);
+ }
+
+ public final boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQShortString routingKey)
+ {
+ return isBound(routingKey == null ? "" : routingKey.asString());
+ }
+
+ public final boolean isBound(String bindingKey)
+ {
+ for(Binding b : _bindings)
+ {
+ if(bindingKey.equals(b.getBindingKey()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public final boolean isBound(AMQQueue queue)
+ {
+ for(Binding b : _bindings)
+ {
+ if(queue == b.getQueue())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
- public boolean isBound(String bindingKey, AMQQueue queue)
+ @Override
+ public final boolean isBound(Map<String, Object> arguments, AMQQueue queue)
{
- return isBound(new AMQShortString(bindingKey), queue);
+ for(Binding b : _bindings)
+ {
+ if(queue == b.getQueue() &&
+ ((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public final boolean isBound(String bindingKey, Map<String, Object> arguments)
+ {
+ for(Binding b : _bindings)
+ {
+ if(b.getBindingKey().equals(bindingKey) &&
+ ((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
+ {
+ return true;
+ }
+ }
+ return false;
}
- public boolean isBound(String bindingKey)
+ public final boolean hasBindings()
{
- return isBound(new AMQShortString(bindingKey));
+ return !_bindings.isEmpty();
}
public Exchange getAlternateExchange()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Sat Jun 1 19:24:36 2013
@@ -272,6 +272,18 @@ public class DefaultExchange implements
}
@Override
+ public boolean isBound(Map<String, Object> arguments, AMQQueue queue)
+ {
+ return (arguments == null || arguments.isEmpty()) && isBound(queue);
+ }
+
+ @Override
+ public boolean isBound(String bindingKey, Map<String, Object> arguments)
+ {
+ return (arguments == null || arguments.isEmpty()) && isBound(bindingKey);
+ }
+
+ @Override
public boolean isBound(String bindingKey)
{
return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Sat Jun 1 19:24:36 2013
@@ -20,9 +20,18 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -36,10 +45,14 @@ import java.util.concurrent.CopyOnWriteA
public class DirectExchange extends AbstractExchange
{
+
+ private static final Logger _logger = Logger.getLogger(DirectExchange.class);
+
private static final class BindingSet
{
private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>();
- private List<BaseQueue> _queues = new ArrayList<BaseQueue>();
+ private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>();
+ private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>();
public synchronized void addBinding(Binding binding)
{
@@ -56,27 +69,59 @@ public class DirectExchange extends Abst
private void recalculateQueues()
{
List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
+ Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>();
for(Binding b : _bindings)
{
- if(!queues.contains(b.getQueue()))
+
+ if(FilterSupport.argumentsContainFilter(b.getArguments()))
{
- queues.add(b.getQueue());
+ try
+ {
+ MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getQueue());
+ filteredQueues.put(b.getQueue(),filter);
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getQueue().getName()
+ + "' to exchange '" + b.getExchange().getName()
+ + "' with arguments: " + b.getArguments(), e);
+ }
+
+ }
+ else
+ {
+
+ if(!queues.contains(b.getQueue()))
+ {
+ queues.add(b.getQueue());
+ }
}
}
- _queues = queues;
+ _unfilteredQueues = queues;
+ _filteredQueues = filteredQueues;
}
- public List<BaseQueue> getQueues()
+ public List<BaseQueue> getUnfilteredQueues()
{
- return _queues;
+ return _unfilteredQueues;
}
public CopyOnWriteArraySet<Binding> getBindings()
{
return _bindings;
}
+
+ public boolean hasFilteredQueues()
+ {
+ return !_filteredQueues.isEmpty();
+ }
+
+ public Map<BaseQueue,MessageFilter> getFilteredQueues()
+ {
+ return _filteredQueues;
+ }
}
private final ConcurrentHashMap<String, BindingSet> _bindingsByKey =
@@ -98,7 +143,30 @@ public class DirectExchange extends Abst
if(bindings != null)
{
- return bindings.getQueues();
+ List<BaseQueue> queues = bindings.getUnfilteredQueues();
+
+ if(bindings.hasFilteredQueues())
+ {
+ Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
+
+ Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues();
+ for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet())
+ {
+ if(!queuesSet.contains(entry.getKey()))
+ {
+ MessageFilter filter = entry.getValue();
+ if(filter.matches(payload))
+ {
+ queuesSet.add(entry.getKey());
+ }
+ }
+ }
+ if(queues.size() != queuesSet.size())
+ {
+ queues = new ArrayList<BaseQueue>(queuesSet);
+ }
+ }
+ return queues;
}
else
{
@@ -106,50 +174,6 @@ public class DirectExchange extends Abst
}
-
- }
-
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return isBound(routingKey,queue);
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- BindingSet bindings = _bindingsByKey.get(bindingKey);
- if(bindings != null)
- {
- return bindings.getQueues().contains(queue);
- }
- return false;
-
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- BindingSet bindings = _bindingsByKey.get(bindingKey);
- return bindings != null && !bindings.getQueues().isEmpty();
- }
-
- public boolean isBound(AMQQueue queue)
- {
-
- for (BindingSet bindings : _bindingsByKey.values())
- {
- if(bindings.getQueues().contains(queue))
- {
- return true;
- }
-
- }
- return false;
- }
-
- public boolean hasBindings()
- {
- return !getBindings().isEmpty();
}
protected void onBind(final Binding binding)
@@ -189,5 +213,4 @@ public class DirectExchange extends Abst
}
-
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Sat Jun 1 19:24:36 2013
@@ -145,12 +145,15 @@ public interface Exchange extends Exchan
Collection<Binding> getBindings();
+ boolean isBound(String bindingKey);
boolean isBound(String bindingKey, AMQQueue queue);
- public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
+ boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue);
- boolean isBound(String bindingKey);
+ boolean isBound(Map<String, Object> arguments, AMQQueue queue);
+
+ boolean isBound(String bindingKey, Map<String, Object> arguments);
void removeReference(ExchangeReferrer exchange);
@@ -158,6 +161,8 @@ public interface Exchange extends Exchan
boolean hasReferrers();
+
+
public interface BindingListener
{
void bindingAdded(Exchange exchange, Binding binding);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Sat Jun 1 19:24:36 2013
@@ -20,11 +20,21 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -42,7 +52,18 @@ public class FanoutExchange extends Abst
/**
* Maps from queue name to queue instances
*/
- private final ConcurrentHashMap<AMQQueue,Integer> _queues = new ConcurrentHashMap<AMQQueue,Integer>();
+ private final Map<AMQQueue,Integer> _queues = new HashMap<AMQQueue,Integer>();
+ private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>();
+ private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>();
+
+ private final AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>> _filteredBindings =
+ new AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>();
+ {
+ Map<AMQQueue,Map<Binding, MessageFilter>> emptyMap = Collections.emptyMap();
+ _filteredBindings.set(emptyMap);
+ }
+
+
public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType();
@@ -54,115 +75,150 @@ public class FanoutExchange extends Abst
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Publishing message to queue " + _queues);
- }
-
for(Binding b : getBindings())
{
b.incrementMatches();
}
- return new ArrayList<BaseQueue>(_queues.keySet());
-
- }
+ final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return isBound(routingKey, queue);
- }
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return isBound(queue);
- }
+ final Map<AMQQueue, Map<Binding, MessageFilter>> filteredBindings = _filteredBindings.get();
+ if(!_filteredQueues.isEmpty())
+ {
+ for(AMQQueue q : _filteredQueues)
+ {
+ final Map<Binding, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
+ if(!(bindingMessageFilterMap == null || result.contains(q)))
+ {
+ for(MessageFilter filter : bindingMessageFilterMap.values())
+ {
+ if(filter.matches(payload))
+ {
+ result.add(q);
+ break;
+ }
+ }
+ }
+ }
- public boolean isBound(AMQShortString routingKey)
- {
+ }
- return (_queues != null) && !_queues.isEmpty();
- }
- public boolean isBound(AMQQueue queue)
- {
- if (queue == null)
+ if (_logger.isDebugEnabled())
{
- return false;
+ _logger.debug("Publishing message to queue " + result);
}
- return _queues.containsKey(queue);
- }
- public boolean hasBindings()
- {
- return !_queues.isEmpty();
+ return result;
+
}
- protected void onBind(final Binding binding)
+
+ protected synchronized void onBind(final Binding binding)
{
AMQQueue queue = binding.getQueue();
assert queue != null;
+ if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
+ {
- Integer oldVal;
+ Integer oldVal;
+ if(_queues.containsKey(queue))
+ {
+ _queues.put(queue,_queues.get(queue)+1);
+ }
+ else
+ {
+ _queues.put(queue, ONE);
+ _unfilteredQueues.add(queue);
+ // No longer any reason to check filters for this queue
+ _filteredQueues.remove(queue);
+ }
- if((oldVal = _queues.putIfAbsent(queue, ONE)) != null)
+ }
+ else
{
- Integer newVal = oldVal+1;
- while(!_queues.replace(queue, oldVal, newVal))
+ try
{
- oldVal = _queues.get(queue);
- if(oldVal == null)
+
+ HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+
+ Map<Binding, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue());
+ final
+ MessageFilter messageFilter =
+ FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue());
+
+ if(bindingsForQueue != null)
{
- oldVal = _queues.putIfAbsent(queue, ONE);
- if(oldVal == null)
+ bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+ bindingsForQueue.put(binding, messageFilter);
+ }
+ else
+ {
+ bindingsForQueue = Collections.singletonMap(binding, messageFilter);
+ if(!_unfilteredQueues.contains(queue))
{
- break;
+ _filteredQueues.add(queue);
}
}
- newVal = oldVal + 1;
+
+ filteredBindings.put(binding.getQueue(), bindingsForQueue);
+
+ _filteredBindings.set(filteredBindings);
+
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Cannoy bind queue " + queue + " to exchange this " + this + " beacuse selector cannot be parsed.", e);
+ return;
}
}
-
if (_logger.isDebugEnabled())
{
_logger.debug("Binding queue " + queue
- + " with routing key " + new AMQShortString(binding.getBindingKey()) + " to exchange " + this);
+ + " with routing key " + binding.getBindingKey() + " to exchange " + this);
}
}
- protected void onUnbind(final Binding binding)
+ protected synchronized void onUnbind(final Binding binding)
{
AMQQueue queue = binding.getQueue();
- Integer oldValue = _queues.get(queue);
-
- boolean done = false;
-
- while(!(done || oldValue == null))
+ if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
{
- while(!(done || oldValue == null) && oldValue.intValue() == 1)
+ Integer oldValue = _queues.remove(queue);
+ if(ONE.equals(oldValue))
{
- if(!_queues.remove(queue, oldValue))
+ // should start checking filters for this queue
+ if(_filteredBindings.get().containsKey(queue))
{
- oldValue = _queues.get(queue);
- }
- else
- {
- done = true;
+ _filteredQueues.add(queue);
}
+ _unfilteredQueues.remove(queue);
}
- while(!(done || oldValue == null) && oldValue.intValue() != 1)
+ else
{
- Integer newValue = oldValue - 1;
- if(!_queues.replace(queue, oldValue, newValue))
- {
- oldValue = _queues.get(queue);
- }
- else
- {
- done = true;
- }
+ _queues.put(queue,oldValue-1);
}
}
+ else // we are removing a binding with filters
+ {
+ HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+
+ Map<Binding,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue());
+ if(bindingsForQueue.size()>1)
+ {
+ bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+ bindingsForQueue.remove(binding);
+ filteredBindings.put(binding.getQueue(),bindingsForQueue);
+ }
+ else
+ {
+ _filteredQueues.remove(queue);
+ }
+ _filteredBindings.set(filteredBindings);
+
+ }
}
}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java?rev=1488561&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java Sat Jun 1 19:24:36 2013
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.exchange;
+
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.Filterable;
+
+public class FilterSupport
+{
+ private static final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache =
+ Collections.synchronizedMap(new WeakHashMap<String, WeakReference<JMSSelectorFilter>>());
+
+ static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
+ {
+ final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ return getMessageFilter(selectorString);
+ }
+
+
+ static MessageFilter createJMSSelectorFilter(Map<String, Object> args) throws AMQInvalidArgumentException
+ {
+ final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ return getMessageFilter(selectorString);
+ }
+
+
+ private static MessageFilter getMessageFilter(String selectorString) throws AMQInvalidArgumentException
+ {
+ WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
+ JMSSelectorFilter selector = null;
+
+ if(selectorRef == null || (selector = selectorRef.get())==null)
+ {
+ try
+ {
+ selector = new JMSSelectorFilter(selectorString);
+ }
+ catch (ParseException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ catch (SelectorParsingException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ catch (TokenMgrError e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
+ }
+ _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
+ }
+ return selector;
+ }
+
+ static boolean argumentsContainFilter(final FieldTable args)
+ {
+ return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+ }
+
+
+ static boolean argumentsContainFilter(final Map<String, Object> args)
+ {
+ return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+ }
+
+
+ static boolean argumentsContainNoLocal(final Map<String, Object> args)
+ {
+ return args != null
+ && args.containsKey(AMQPFilterTypes.NO_LOCAL.toString())
+ && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString()));
+ }
+
+
+ static boolean argumentsContainNoLocal(final FieldTable args)
+ {
+ return args != null
+ && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
+ && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
+ }
+
+
+ static boolean argumentsContainJMSSelector(final Map<String,Object> args)
+ {
+ return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String)
+ && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
+ }
+
+
+ static boolean argumentsContainJMSSelector(final FieldTable args)
+ {
+ return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
+ && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
+ }
+
+
+ static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+ {
+ if(argumentsContainNoLocal(args))
+ {
+ MessageFilter filter = new NoLocalFilter(queue);
+
+ if(argumentsContainJMSSelector(args))
+ {
+ filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ }
+ return filter;
+ }
+ else
+ {
+ return createJMSSelectorFilter(args);
+ }
+ }
+
+ static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
+ {
+ if(argumentsContainNoLocal(args))
+ {
+ MessageFilter filter = new NoLocalFilter(queue);
+
+ if(argumentsContainJMSSelector(args))
+ {
+ filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ }
+ return filter;
+ }
+ else
+ {
+ return createJMSSelectorFilter(args);
+ }
+ }
+
+ static final class NoLocalFilter implements MessageFilter
+ {
+ private final AMQQueue _queue;
+
+ public NoLocalFilter(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ InboundMessage inbound = (InboundMessage) message;
+ final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
+ return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
+
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ NoLocalFilter that = (NoLocalFilter) o;
+
+ return _queue == null ? that._queue == null : _queue.equals(that._queue);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _queue != null ? _queue.hashCode() : 0;
+ }
+ }
+
+ static final class CompoundFilter implements MessageFilter
+ {
+ private MessageFilter _noLocalFilter;
+ private MessageFilter _jmsSelectorFilter;
+
+ public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
+ {
+ _noLocalFilter = filter;
+ _jmsSelectorFilter = jmsSelectorFilter;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ CompoundFilter that = (CompoundFilter) o;
+
+ if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+ {
+ return false;
+ }
+ if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
+ result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
+ return result;
+ }
+ }
+}
Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Sat Jun 1 19:24:36 2013
@@ -22,15 +22,19 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.AMQMessageHeader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* Defines binding and matching based on a set of headers.
@@ -44,13 +48,14 @@ class HeadersBinding
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
+ private MessageFilter _filter;
/**
* Creates a header binding for a set of mappings. Those mappings whose value is
* null or the empty string are assumed only to be required headers, with
* no constraint on the value. Those with a non-null value are assumed to
* define a required match of value.
- *
+ *
* @param binding the binding to create a header binding using
*/
public HeadersBinding(Binding binding)
@@ -66,9 +71,30 @@ class HeadersBinding
_mappings = null;
}
}
-
+
private void initMappings()
{
+ if(FilterSupport.argumentsContainFilter(_mappings))
+ {
+ try
+ {
+ _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue());
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName()
+ +"' to exchange '"+_binding.getExchange().getName()
+ +"' with arguments: " + _binding.getArguments());
+ _filter = new MessageFilter()
+ {
+ @Override
+ public boolean matches(Filterable message)
+ {
+ return false;
+ }
+ };
+ }
+ }
for(Map.Entry<String, Object> entry : _mappings.entrySet())
{
String propertyName = entry.getKey();
@@ -87,7 +113,7 @@ class HeadersBinding
}
}
}
-
+
public Binding getBinding()
{
return _binding;
@@ -111,6 +137,11 @@ class HeadersBinding
}
}
+ public boolean matches(InboundMessage message)
+ {
+ return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
+ }
+
private boolean and(AMQMessageHeader headers)
{
if(headers.containsHeaders(required))
@@ -215,7 +246,7 @@ class HeadersBinding
{
return key.startsWith("X-") || key.startsWith("x-");
}
-
+
@Override
public boolean equals(final Object o)
{
@@ -250,4 +281,4 @@ class HeadersBinding
return true;
}
-}
\ No newline at end of file
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Sat Jun 1 19:24:36 2013
@@ -69,14 +69,14 @@ public class HeadersExchange extends Abs
{
private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
-
+
private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
-
+
private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
new CopyOnWriteArrayList<HeadersBinding>();
-
+
public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType();
public HeadersExchange()
@@ -87,112 +87,31 @@ public class HeadersExchange extends Abs
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
- AMQMessageHeader header = payload.getMessageHeader();
if (_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header);
+ _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader());
}
-
+
LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
-
+
for (HeadersBinding hb : _bindingHeaderMatchers)
{
- if (hb.matches(header))
+ if (hb.matches(payload))
{
Binding b = hb.getBinding();
-
+
b.incrementMatches();
-
+
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " +
- header + " to " + b.getQueue().getNameShortString());
+ payload.getMessageHeader() + " to " + b.getQueue().getNameShortString());
}
queues.add(b.getQueue());
}
}
-
- return new ArrayList<BaseQueue>(queues);
- }
-
-
- public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
- {
- CopyOnWriteArraySet<Binding> bindings;
- if(bindingKey == null)
- {
- bindings = new CopyOnWriteArraySet<Binding>(getBindings());
- }
- else
- {
- bindings = _bindingsByKey.get(bindingKey);
- }
-
- if(bindings != null)
- {
- for(Binding binding : bindings)
- {
- if(queue == null || binding.getQueue().equals(queue))
- {
- return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments);
- }
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- //fixme isBound here should take the arguements in to consideration.
- return isBound(routingKey, queue);
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
-
- if(bindings != null)
- {
- for(Binding binding : bindings)
- {
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
- return bindings != null && !bindings.isEmpty();
- }
-
- public boolean isBound(AMQQueue queue)
- {
- for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
- {
- for(Binding binding : bindings)
- {
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
- }
- }
-
- return false;
- }
- public boolean hasBindings()
- {
- return !getBindings().isEmpty();
+ return new ArrayList<BaseQueue>(queues);
}
protected void onBind(final Binding binding)
@@ -216,7 +135,7 @@ public class HeadersExchange extends Abs
bindings = newBindings;
}
}
-
+
if(_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() +
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Sat Jun 1 19:24:36 2013
@@ -20,21 +20,15 @@
*/
package org.apache.qpid.server.exchange;
-import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.filter.SelectorParsingException;
-import org.apache.qpid.filter.selector.ParseException;
-import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
@@ -42,14 +36,10 @@ import org.apache.qpid.server.exchange.t
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.Filterable;
public class TopicExchange extends AbstractExchange
{
@@ -65,8 +55,6 @@ public class TopicExchange extends Abstr
private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
- private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
-
public TopicExchange()
{
super(TYPE);
@@ -77,7 +65,7 @@ public class TopicExchange extends Abstr
AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
AMQQueue queue = binding.getQueue();
FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
-
+
assert queue != null;
assert rKey != null;
@@ -91,26 +79,26 @@ public class TopicExchange extends Abstr
FieldTable oldArgs = _bindings.get(binding);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- if(argumentsContainFilter(oldArgs))
+ if(FilterSupport.argumentsContainFilter(oldArgs))
{
result.replaceQueueFilter(queue,
- createMessageFilter(oldArgs, queue),
- createMessageFilter(args, queue));
+ FilterSupport.createMessageFilter(oldArgs, queue),
+ FilterSupport.createMessageFilter(args, queue));
}
else
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
result.removeUnfilteredQueue(queue);
}
}
else
{
- if(argumentsContainFilter(oldArgs))
+ if(FilterSupport.argumentsContainFilter(oldArgs))
{
result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
+ result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue));
}
else
{
@@ -118,7 +106,7 @@ public class TopicExchange extends Abstr
return;
}
}
-
+
result.addBinding(binding);
}
@@ -129,9 +117,9 @@ public class TopicExchange extends Abstr
if(result == null)
{
result = new TopicExchangeResult();
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
}
else
{
@@ -142,89 +130,22 @@ public class TopicExchange extends Abstr
}
else
{
- if(argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
}
else
{
result.addUnfilteredQueue(queue);
}
}
-
+
result.addBinding(binding);
_bindings.put(binding, args);
}
}
- private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
- {
- if(argumentsContainNoLocal(args))
- {
- MessageFilter filter = new NoLocalFilter(queue);
-
- if(argumentsContainJMSSelector(args))
- {
- filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
- }
- return filter;
- }
- else
- {
- return createJMSSelectorFilter(args);
- }
-
- }
-
-
- private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
- {
- final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
- WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
- JMSSelectorFilter selector = null;
-
- if(selectorRef == null || (selector = selectorRef.get())==null)
- {
- try
- {
- selector = new JMSSelectorFilter(selectorString);
- }
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (TokenMgrError e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
- }
- return selector;
- }
-
- private static boolean argumentsContainFilter(final FieldTable args)
- {
- return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
- }
-
- private static boolean argumentsContainNoLocal(final FieldTable args)
- {
- return args != null
- && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
- && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
- }
-
- private static boolean argumentsContainJMSSelector(final FieldTable args)
- {
- return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
- && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
- }
-
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
@@ -256,87 +177,6 @@ public class TopicExchange extends Abstr
}
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
-
- if (arguments == null)
- {
- return _bindings.containsKey(binding);
- }
- else
- {
- FieldTable o = _bindings.get(binding);
- if (o != null)
- {
- return o.equals(arguments);
- }
- else
- {
- return false;
- }
-
- }
- }
-
- public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
- {
- Binding binding = new Binding(null, bindingKey, queue, this, arguments);
- if (arguments == null)
- {
- return _bindings.containsKey(binding);
- }
- else
- {
- FieldTable o = _bindings.get(binding);
- if (o != null)
- {
- return arguments.equals(FieldTable.convertToMap(o));
- }
- else
- {
- return false;
- }
- }
-
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return isBound(routingKey, null, queue);
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- for(Binding b : _bindings.keySet())
- {
- if(b.getBindingKey().equals(routingKey.toString()))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean isBound(AMQQueue queue)
- {
- for(Binding b : _bindings.keySet())
- {
- if(b.getQueue().equals(queue))
- {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean hasBindings()
- {
- return !_bindings.isEmpty();
- }
-
private boolean deregisterQueue(final Binding binding)
{
if(_bindings.containsKey(binding))
@@ -344,14 +184,15 @@ public class TopicExchange extends Abstr
FieldTable bindingArgs = _bindings.remove(binding);
AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
-
+
result.removeBinding(binding);
-
- if(argumentsContainFilter(bindingArgs))
+
+ if(FilterSupport.argumentsContainFilter(bindingArgs))
{
try
{
- result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
+ result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs,
+ binding.getQueue()));
}
catch (AMQInvalidArgumentException e)
{
@@ -418,96 +259,4 @@ public class TopicExchange extends Abstr
deregisterQueue(binding);
}
- private static final class NoLocalFilter implements MessageFilter
- {
- private final AMQQueue _queue;
-
- public NoLocalFilter(AMQQueue queue)
- {
- _queue = queue;
- }
-
- public boolean matches(Filterable message)
- {
- InboundMessage inbound = (InboundMessage) message;
- final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
- return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
-
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
-
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- NoLocalFilter that = (NoLocalFilter) o;
-
- return _queue == null ? that._queue == null : _queue.equals(that._queue);
- }
-
- @Override
- public int hashCode()
- {
- return _queue != null ? _queue.hashCode() : 0;
- }
- }
-
- private static final class CompoundFilter implements MessageFilter
- {
- private MessageFilter _noLocalFilter;
- private MessageFilter _jmsSelectorFilter;
-
- public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
- {
- _noLocalFilter = filter;
- _jmsSelectorFilter = jmsSelectorFilter;
- }
-
- public boolean matches(Filterable message)
- {
- return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- CompoundFilter that = (CompoundFilter) o;
-
- if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
- {
- return false;
- }
- if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
- result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
- return result;
- }
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Sat Jun 1 19:24:36 2013
@@ -159,9 +159,15 @@ public class ExchangeBoundHandler implem
else
{
+ String message = "Queue " + queueName + " not bound with routing key " +
+ body.getRoutingKey() + " to exchange " + exchangeName;
+
+ if(message.length()>255)
+ {
+ message = message.substring(0,254);
+ }
response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
- new AMQShortString("Queue " + queueName + " not bound with routing key " +
- body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
+ new AMQShortString(message)); // replyText
}
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Sat Jun 1 19:24:36 2013
@@ -1130,22 +1130,22 @@ public class ServerSessionDelegate exten
if(queueMatched)
{
- result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
+ final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue);
+ result.setKeyNotMatched(!keyMatched);
+ if(method.hasArguments() && keyMatched)
+ {
+ result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue));
+ }
}
else
{
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
}
- if(method.hasArguments())
- {
- result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null));
- }
-
}
else if (method.hasArguments())
{
- result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null));
+ result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue));
}
}
@@ -1166,7 +1166,7 @@ public class ServerSessionDelegate exten
{
if(method.hasArguments())
{
- result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null));
+ result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments()));
}
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Sat Jun 1 19:24:36 2013
@@ -21,22 +21,32 @@
package org.apache.qpid.server.exchange;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import java.util.UUID;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class FanoutExchangeTest extends TestCase
{
@@ -51,7 +61,9 @@ public class FanoutExchangeTest extends
_virtualHost = mock(VirtualHost.class);
SecurityManager securityManager = mock(SecurityManager.class);
when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
- when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
+
_exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
}
@@ -76,14 +88,14 @@ public class FanoutExchangeTest extends
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
- _exchange.isBound((AMQShortString) null, (FieldTable) null, queue));
+ _exchange.isBound(new AMQShortString("matters"), (FieldTable) null, queue));
}
public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
- _exchange.isBound((AMQShortString) null, queue));
+ _exchange.isBound(new AMQShortString("matters"), queue));
}
public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException
@@ -95,9 +107,86 @@ public class FanoutExchangeTest extends
private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException
{
+ AMQQueue queue = mockQueue();
+ _exchange.addBinding("matters", queue, null);
+ return queue;
+ }
+
+ private AMQQueue mockQueue()
+ {
AMQQueue queue = mock(AMQQueue.class);
when(queue.getVirtualHost()).thenReturn(_virtualHost);
- _exchange.addBinding("does not matter", queue, null);
return queue;
}
+
+ public void testRoutingWithSelectors() throws Exception
+ {
+ AMQQueue queue1 = mockQueue();
+ AMQQueue queue2 = mockQueue();
+
+ _exchange.addBinding("key",queue1, null);
+ _exchange.addBinding("key",queue2, null);
+
+
+ List<? extends BaseQueue> result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+ _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
+
+
+ result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+ _exchange.removeBinding("key",queue2,null);
+
+ result = _exchange.route(mockMessage(true));
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+ result = _exchange.route(mockMessage(false));
+
+ assertEquals("Expected message to be routed to queue1 only", 1, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertFalse("Expected queue2 not to be routed to", result.contains(queue2));
+
+ _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
+
+
+ result = _exchange.route(mockMessage(false));
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be routed to", result.contains(queue1));
+ assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+
+
+ }
+
+ private InboundMessage mockMessage(boolean val)
+ {
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.containsHeader("select")).thenReturn(true);
+ when(header.getHeader("select")).thenReturn(val);
+ when(header.getHeaderNames()).thenReturn(Collections.singleton("select"));
+ when(header.containsHeaders(anySet())).then(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Set names = (Set) invocation.getArguments()[0];
+ return names.size() == 1 && names.contains("select");
+
+ }
+ });
+ final InboundMessage inboundMessage = mock(InboundMessage.class);
+ when(inboundMessage.getMessageHeader()).thenReturn(header);
+ return inboundMessage;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Sat Jun 1 19:24:36 2013
@@ -20,106 +20,230 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.util.BrokerTestHelper;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import junit.framework.TestCase;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
+public class HeadersExchangeTest extends TestCase
{
- private AMQProtocolSession _protocolSession;
+ private HeadersExchange _exchange;
+ private VirtualHost _virtualHost;
@Override
public void setUp() throws Exception
{
super.setUp();
- BrokerTestHelper.setUp();
- _protocolSession = new InternalTestProtocolSession(getVirtualHost(), BrokerTestHelper.createBrokerMock());
+
+ CurrentActor.setDefault(mock(LogActor.class));
+ _exchange = new HeadersExchange();
+ _virtualHost = mock(VirtualHost.class);
+ SecurityManager securityManager = mock(SecurityManager.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
+ when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true);
+ when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true);
+
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
+
}
- @Override
- public void tearDown() throws Exception
+ protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception
+ {
+ List<? extends BaseQueue> results = _exchange.route(msg);
+ List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
+ unexpected.removeAll(Arrays.asList(expected));
+ assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
+ List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected));
+ missing.removeAll(results);
+ assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty());
+ assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size());
+ }
+
+
+ private AMQQueue createAndBind(final String name, String... arguments)
+ throws Exception
+ {
+ return createAndBind(name, getArgsMapFromStrings(arguments));
+ }
+
+ private Map<String, Object> getArgsMapFromStrings(String... arguments)
+ {
+ Map<String, Object> map = new HashMap<String,Object>();
+
+ for(String arg : arguments)
+ {
+ if(arg.contains("="))
+ {
+ String[] keyValue = arg.split("=",2);
+ map.put(keyValue[0],keyValue[1]);
+ }
+ else
+ {
+ map.put(arg,null);
+ }
+ }
+ return map;
+ }
+
+ private AMQQueue createAndBind(final String name, Map<String, Object> arguments)
+ throws Exception
+ {
+ AMQQueue q = create(name);
+ bind(name, arguments, q);
+ return q;
+ }
+
+ private void bind(String bindingKey, Map<String, Object> arguments, AMQQueue q)
+ throws AMQSecurityException, AMQInternalException
+ {
+ _exchange.addBinding(bindingKey,q,arguments);
+ }
+
+ private AMQQueue create(String name)
+ {
+ AMQQueue q = mock(AMQQueue.class);
+ when(q.toString()).thenReturn(name);
+ when(q.getVirtualHost()).thenReturn(_virtualHost);
+ return q;
+ }
+
+
+ public void testSimple() throws Exception
+ {
+ AMQQueue q1 = createAndBind("Q1", "F0000");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark");
+ AMQQueue q3 = createAndBind("Q3", "F0001");
+ AMQQueue q4 = createAndBind("Q4", "F0001=Bear");
+ AMQQueue q5 = createAndBind("Q5", "F0000", "F0001");
+ AMQQueue q6 = createAndBind("Q6", "F0000=Aardvark", "F0001=Bear");
+ AMQQueue q7 = createAndBind("Q7", "F0000", "F0001=Bear");
+ AMQQueue q8 = createAndBind("Q8", "F0000=Aardvark", "F0001");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")),
+ q1, q2, q3, q4, q5, q6, q7, q8);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+
+ }
+
+ public void testAny() throws Exception
+ {
+ AMQQueue q1 = createAndBind("Q1", "F0000", "F0001", "X-match=any");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark", "F0001=Bear", "X-match=any");
+ AMQQueue q3 = createAndBind("Q3", "F0000", "F0001=Bear", "X-match=any");
+ AMQQueue q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any");
+ AMQQueue q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+ }
+
+ public void testOnUnbind() throws Exception
+ {
+ AMQQueue q1 = createAndBind("Q1", "F0000");
+ AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark");
+ AMQQueue q3 = createAndBind("Q3", "F0001");
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3);
+
+ _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000"));
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000")));
+ routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
+ }
+
+
+ public void testWithSelectors() throws Exception
{
- BrokerTestHelper.tearDown();
- super.tearDown();
+ AMQQueue q1 = create("Q1");
+ AMQQueue q2 = create("Q2");
+ bind("q1",getArgsMapFromStrings("F"), q1);
+ bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1);
+ bind("q2",getArgsMapFromStrings("F=1"), q2);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2);
+
+
+ AMQQueue q3 = create("Q3");
+ bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3);
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1);
+ bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3);
+
+ routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3);
+
}
- public void testSimple() throws AMQException
+ private InboundMessage mockMessage(final Map<String, Object> headerValues)
{
- TestQueue q1 = bindDefault("F0000");
- TestQueue q2 = bindDefault("F0000=Aardvark");
- TestQueue q3 = bindDefault("F0001");
- TestQueue q4 = bindDefault("F0001=Bear");
- TestQueue q5 = bindDefault("F0000", "F0001");
- TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
- TestQueue q7 = bindDefault("F0000", "F0001=Bear");
- TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
- routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"),
- q1, q2, q3, q4, q5, q6, q7, q8);
- routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
-
- Message m7 = new Message(_protocolSession, "Message7", "XXXXX");
-
- MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
- pb7.setMandatory(true);
- routeAndTest(m7,true);
-
- Message m8 = new Message(_protocolSession, "Message8", "F0000");
- MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
- pb8.setMandatory(true);
- routeAndTest(m8,false,q1);
-
-
- }
-
- public void testAny() throws AMQException
- {
- TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
- TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
- TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
- TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
- TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4);
- routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message(_protocolSession, "Message6", "F0002"));
- }
-
- public void testMandatory() throws AMQException
- {
- bindDefault("F0000");
- Message m1 = new Message(_protocolSession, "Message1", "XXXXX");
- Message m2 = new Message(_protocolSession, "Message2", "F0000");
- MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
- pb1.setMandatory(true);
- MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
- pb2.setMandatory(true);
- routeAndTest(m1,true);
- }
-
- public void testOnUnbind() throws AMQException
- {
- TestQueue q1 = bindDefault("F0000");
- TestQueue q2 = bindDefault("F0000=Aardvark");
- TestQueue q3 = bindDefault("F0001");
-
- routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
- routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3);
-
- unbind(q1,"F0000");
- routeAndTest(new Message(_protocolSession, "Message4", "F0000"));
- routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2);
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.containsHeader(anyString())).then(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ return headerValues.containsKey((String) invocation.getArguments()[0]);
+ }
+ });
+ when(header.getHeader(anyString())).then(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ return headerValues.get((String) invocation.getArguments()[0]);
+ }
+ });
+ when(header.getHeaderNames()).thenReturn(headerValues.keySet());
+ when(header.containsHeaders(anySet())).then(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Set names = (Set) invocation.getArguments()[0];
+ return headerValues.keySet().containsAll(names);
+
+ }
+ });
+ final InboundMessage inboundMessage = mock(InboundMessage.class);
+ when(inboundMessage.getMessageHeader()).thenReturn(header);
+ return inboundMessage;
}
-
public static junit.framework.Test suite()
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java Sat Jun 1 19:24:36 2013
@@ -31,7 +31,7 @@ public class AMQHeadersExchange extends
{
public AMQHeadersExchange(BindingURL binding)
{
- this(binding.getExchangeName());
+ super(binding);
}
public AMQHeadersExchange(String name)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org