You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/15 16:25:16 UTC
svn commit: r1666810 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/exchange/topic/
broker-core/src/main/java/org/apache/qpid/server/filter/
broker-core/src/main/j...
Author: rgodfrey
Date: Sun Mar 15 15:25:15 2015
New Revision: 1666810
URL: http://svn.apache.org/r1666810
Log:
QPID-6452 : Defer creating filters in default filter map until a consumer is added. Optimize filters which start at the tail of the queue.
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Sun Mar 15 15:25:15 2015
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
@@ -58,7 +59,7 @@ public class DirectExchange extends Abst
{
private CopyOnWriteArraySet<BindingImpl> _bindings = new CopyOnWriteArraySet<BindingImpl>();
private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>();
- private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>();
+ private Map<BaseQueue, FilterManager> _filteredQueues = new HashMap<>();
public synchronized void addBinding(BindingImpl binding)
{
@@ -80,7 +81,7 @@ public class DirectExchange extends Abst
private void recalculateQueues()
{
List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
- Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>();
+ Map<BaseQueue, FilterManager> filteredQueues = new HashMap<>();
for(BindingImpl b : _bindings)
{
@@ -89,7 +90,7 @@ public class DirectExchange extends Abst
{
try
{
- MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue());
+ FilterManager filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue());
filteredQueues.put(b.getAMQQueue(),filter);
}
catch (AMQInvalidArgumentException e)
@@ -129,7 +130,7 @@ public class DirectExchange extends Abst
return !_filteredQueues.isEmpty();
}
- public Map<BaseQueue,MessageFilter> getFilteredQueues()
+ public Map<BaseQueue,FilterManager> getFilteredQueues()
{
return _filteredQueues;
}
@@ -159,14 +160,15 @@ public class DirectExchange extends Abst
if(bindings.hasFilteredQueues())
{
Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
+ Filterable filterable = Filterable.Factory.newInstance(payload, instanceProperties);
- Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues();
- for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet())
+ Map<BaseQueue, FilterManager> filteredQueues = bindings.getFilteredQueues();
+ for(Map.Entry<BaseQueue, FilterManager> entry : filteredQueues.entrySet())
{
if(!queuesSet.contains(entry.getKey()))
{
- MessageFilter filter = entry.getValue();
- if(filter.matches(Filterable.Factory.newInstance(payload, instanceProperties)))
+ FilterManager filter = entry.getValue();
+ if(filter.allAllow(filterable))
{
queuesSet.add(entry.getKey());
}
@@ -174,7 +176,7 @@ public class DirectExchange extends Abst
}
if(queues.size() != queuesSet.size())
{
- queues = new ArrayList<BaseQueue>(queuesSet);
+ queues = new ArrayList<>(queuesSet);
}
}
return queues;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Sun Mar 15 15:25:15 2015
@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedObject;
@@ -58,10 +58,10 @@ public class FanoutExchange extends Abst
private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>();
private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>();
- private final AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>> _filteredBindings =
- new AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>>();
+ private final AtomicReference<Map<AMQQueue,Map<BindingImpl, FilterManager>>> _filteredBindings =
+ new AtomicReference<>();
{
- Map<AMQQueue,Map<BindingImpl, MessageFilter>> emptyMap = Collections.emptyMap();
+ Map<AMQQueue,Map<BindingImpl, FilterManager>> emptyMap = Collections.emptyMap();
_filteredBindings.set(emptyMap);
}
@@ -85,17 +85,17 @@ public class FanoutExchange extends Abst
final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
- final Map<AMQQueue, Map<BindingImpl, MessageFilter>> filteredBindings = _filteredBindings.get();
+ final Map<AMQQueue, Map<BindingImpl, FilterManager>> filteredBindings = _filteredBindings.get();
if(!_filteredQueues.isEmpty())
{
for(AMQQueue q : _filteredQueues)
{
- final Map<BindingImpl, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
+ final Map<BindingImpl, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
if(!(bindingMessageFilterMap == null || result.contains(q)))
{
- for(MessageFilter filter : bindingMessageFilterMap.values())
+ for(FilterManager filter : bindingMessageFilterMap.values())
{
- if(filter.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
+ if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
{
result.add(q);
break;
@@ -143,12 +143,12 @@ public class FanoutExchange extends Abst
}
else
{
- HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
- new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+ HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+ new HashMap<>(_filteredBindings.get());
- Map<BindingImpl,MessageFilter> bindingsForQueue;
+ Map<BindingImpl,FilterManager> bindingsForQueue;
- final MessageFilter messageFilter;
+ final FilterManager messageFilter;
try
{
@@ -163,11 +163,11 @@ public class FanoutExchange extends Abst
if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
{
- bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(filteredBindings.remove(binding.getAMQQueue()));
+ bindingsForQueue = new HashMap<>(filteredBindings.remove(binding.getAMQQueue()));
}
else // previously unfiltered
{
- bindingsForQueue = new HashMap<BindingImpl,MessageFilter>();
+ bindingsForQueue = new HashMap<BindingImpl,FilterManager>();
Integer oldValue = _queues.remove(queue);
if (ONE.equals(oldValue))
@@ -217,16 +217,16 @@ public class FanoutExchange extends Abst
try
{
- HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
- new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+ HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+ new HashMap<>(_filteredBindings.get());
- Map<BindingImpl, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
- final MessageFilter messageFilter =
+ Map<BindingImpl, FilterManager> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
+ final FilterManager messageFilter =
FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue());
if(bindingsForQueue != null)
{
- bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue);
+ bindingsForQueue = new HashMap<>(bindingsForQueue);
bindingsForQueue.put(binding, messageFilter);
}
else
@@ -278,13 +278,13 @@ public class FanoutExchange extends Abst
}
else // we are removing a binding with filters
{
- HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
- new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+ HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+ new HashMap<>(_filteredBindings.get());
- Map<BindingImpl,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
+ Map<BindingImpl,FilterManager> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
if(bindingsForQueue.size()>1)
{
- bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue);
+ bindingsForQueue = new HashMap<>(bindingsForQueue);
bindingsForQueue.remove(binding);
filteredBindings.put(binding.getAMQQueue(),bindingsForQueue);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Sun Mar 15 15:25:15 2015
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
@@ -48,7 +49,7 @@ class HeadersBinding
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
- private MessageFilter _filter;
+ private FilterManager _filter;
/**
* Creates a header binding for a set of mappings. Those mappings whose value is
@@ -86,7 +87,8 @@ class HeadersBinding
_logger.warn("Invalid filter in binding queue '"+_binding.getAMQQueue().getName()
+"' to exchange '"+_binding.getExchange().getName()
+"' with arguments: " + _binding.getArguments());
- _filter = new MessageFilter()
+ _filter = new FilterManager();
+ _filter.add("x-exclude-all",new MessageFilter()
{
@Override
public String getName()
@@ -94,12 +96,18 @@ class HeadersBinding
return "";
}
- @Override
+ @Override
+ public boolean startAtTail()
+ {
+ return false;
+ }
+
+ @Override
public boolean matches(Filterable message)
{
return false;
}
- };
+ });
}
}
for(Map.Entry<String, Object> entry : _mappings.entrySet())
@@ -146,7 +154,7 @@ class HeadersBinding
public boolean matches(Filterable message)
{
- return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
+ return matches(message.getMessageHeader()) && (_filter == null || _filter.allAllow(message));
}
private boolean and(AMQMessageHeader headers)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Sun Mar 15 15:25:15 2015
@@ -31,15 +31,15 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.queue.AMQQueue;
public final class TopicExchangeResult implements TopicMatcherResult
{
private final List<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>();
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
- private final ConcurrentMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
+ private final ConcurrentMap<AMQQueue, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0);
public void addUnfilteredQueue(AMQQueue queue)
@@ -93,15 +93,15 @@ public final class TopicExchangeResult i
public List<BindingImpl> getBindings()
{
- return new ArrayList<BindingImpl>(_bindings);
+ return new ArrayList<>(_bindings);
}
- public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
+ public void addFilteredQueue(AMQQueue queue, FilterManager filter)
{
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+ Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
if(filters == null)
{
- filters = new ConcurrentHashMap<MessageFilter,Integer>();
+ filters = new ConcurrentHashMap<>();
_filteredQueues.put(queue, filters);
}
Integer instances = filters.get(filter);
@@ -116,9 +116,9 @@ public final class TopicExchangeResult i
}
- public void removeFilteredQueue(AMQQueue queue, MessageFilter filter)
+ public void removeFilteredQueue(AMQQueue queue, FilterManager filter)
{
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+ Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
if(filters != null)
{
Integer instances = filters.get(filter);
@@ -143,11 +143,11 @@ public final class TopicExchangeResult i
}
public void replaceQueueFilter(AMQQueue queue,
- MessageFilter oldFilter,
- MessageFilter newFilter)
+ FilterManager oldFilter,
+ FilterManager newFilter)
{
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
- Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters);
+ Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
+ Map<FilterManager,Integer> newFilters = new ConcurrentHashMap<>(filters);
Integer oldFilterInstances = filters.get(oldFilter);
if(oldFilterInstances == 1)
{
@@ -190,13 +190,13 @@ public final class TopicExchangeResult i
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
- for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet())
+ for(Map.Entry<AMQQueue, Map<FilterManager, Integer>> entry : _filteredQueues.entrySet())
{
if(!queues.contains(entry.getKey()))
{
- for(MessageFilter filter : entry.getValue().keySet())
+ for(FilterManager filter : entry.getValue().keySet())
{
- if(filter.matches(msg))
+ if(filter.allAllow(msg))
{
queues.add(entry.getKey());
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java Sun Mar 15 15:25:15 2015
@@ -25,10 +25,12 @@ import org.apache.qpid.common.AMQPFilter
public final class ArrivalTimeFilter implements MessageFilter
{
private final long _startingFrom;
+ private final boolean _startAtTail;
- public ArrivalTimeFilter(final long startingFrom)
+ public ArrivalTimeFilter(final long startingFrom, final boolean startAtTail)
{
_startingFrom = startingFrom;
+ _startAtTail = startAtTail;
}
@Override
@@ -38,9 +40,38 @@ public final class ArrivalTimeFilter imp
}
@Override
+ public boolean startAtTail()
+ {
+ return _startAtTail;
+ }
+
+ @Override
public boolean matches(final Filterable message)
{
- return message.getArrivalTime() >= _startingFrom;
+ return message.getArrivalTime() >= _startingFrom;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final ArrivalTimeFilter that = (ArrivalTimeFilter) o;
+
+ return _startingFrom == that._startingFrom;
+
}
+ @Override
+ public int hashCode()
+ {
+ return (int) (_startingFrom ^ (_startingFrom >>> 32));
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java Sun Mar 15 15:25:15 2015
@@ -40,7 +40,7 @@ public final class ArrivalTimeFilterFact
String arg = arguments.get(0);
long startingFrom= Long.parseLong(arg);
- return new ArrivalTimeFilter(startingFrom);
+ return new ArrivalTimeFilter(System.currentTimeMillis() + startingFrom, startingFrom==0l);
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java Sun Mar 15 15:25:15 2015
@@ -50,6 +50,18 @@ public class FilterManager
return true;
}
+ public boolean startAtTail()
+ {
+ for(MessageFilter filter : _filters.values())
+ {
+ if(filter.startAtTail())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
public Iterator<MessageFilter> filters()
{
return _filters.values().iterator();
@@ -65,11 +77,42 @@ public class FilterManager
return _filters.containsKey(name);
}
+ public boolean hasFilter(final MessageFilter filter)
+ {
+ return _filters.containsValue(filter);
+ }
+
@Override
public String toString()
{
return _filters.toString();
}
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final FilterManager that = (FilterManager) o;
+
+ if (_filters != null ? !_filters.equals(that._filters) : that._filters != null)
+ {
+ return false;
+ }
+
+ return true;
+ }
+ @Override
+ public int hashCode()
+ {
+ return _filters != null ? _filters.hashCode() : 0;
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Sun Mar 15 15:25:15 2015
@@ -95,22 +95,25 @@ public class FilterSupport
&& ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
}
- public static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+ public static FilterManager createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
{
+ FilterManager filterManager = null;
if(argumentsContainNoLocal(args))
{
- MessageFilter filter = new NoLocalFilter(queue);
+ filterManager = new FilterManager();
+ filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new NoLocalFilter(queue));
+ }
- if(argumentsContainJMSSelector(args))
+ if(argumentsContainJMSSelector(args))
+ {
+ if(filterManager == null)
{
- filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ filterManager = new FilterManager();
}
- return filter;
- }
- else
- {
- return createJMSSelectorFilter(args);
+ filterManager.add(AMQPFilterTypes.JMS_SELECTOR.toString(),createJMSSelectorFilter(args));
}
+ return filterManager;
+
}
@PluggableService
@@ -144,6 +147,12 @@ public class FilterSupport
}
@Override
+ public boolean startAtTail()
+ {
+ return false;
+ }
+
+ @Override
public boolean equals(Object o)
{
if (this == o)
@@ -170,62 +179,4 @@ public class FilterSupport
}
- static final class CompoundFilter implements MessageFilter
- {
- private MessageFilter _noLocalFilter;
- private MessageFilter _jmsSelectorFilter;
-
- public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
- {
- _noLocalFilter = filter;
- _jmsSelectorFilter = jmsSelectorFilter;
- }
-
- @Override
- public String getName()
- {
- return "";
- }
-
- public boolean matches(Filterable message)
- {
- return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- CompoundFilter that = (CompoundFilter) o;
-
- if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
- {
- return false;
- }
- if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
- result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
- return result;
- }
-
-
- }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java Sun Mar 15 15:25:15 2015
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.filter;
+import org.apache.qpid.filter.FilterableMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-public interface Filterable
+public interface Filterable extends FilterableMessage
{
AMQMessageHeader getMessageHeader();
@@ -81,6 +82,55 @@ public interface Filterable
{
return message.getArrivalTime();
}
+
+ @Override
+ public String getReplyTo()
+ {
+ return message.getMessageHeader().getReplyTo();
+ }
+
+ @Override
+ public String getType()
+ {
+ return message.getMessageHeader().getType();
+ }
+
+ @Override
+ public byte getPriority()
+ {
+ return message.getMessageHeader().getPriority();
+ }
+
+ @Override
+ public String getMessageId()
+ {
+ return message.getMessageHeader().getMessageId();
+ }
+
+ @Override
+ public long getTimestamp()
+ {
+ return message.getMessageHeader().getTimestamp();
+ }
+
+ @Override
+ public String getCorrelationId()
+ {
+ return message.getMessageHeader().getCorrelationId();
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return message.getMessageHeader().getExpiration();
+ }
+
+ @Override
+ public Object getHeader(String name)
+ {
+ return message.getMessageHeader().getHeader(name);
+ }
+
};
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Sun Mar 15 15:25:15 2015
@@ -60,7 +60,7 @@ public class JMSSelectorFilter implement
public boolean matches(Filterable message)
{
- boolean match = _matcher.matches(wrap(message));
+ boolean match = _matcher.matches(message);
if(_logger.isDebugEnabled())
{
_logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
@@ -68,60 +68,10 @@ public class JMSSelectorFilter implement
return match;
}
- private FilterableMessage wrap(final Filterable message)
+ @Override
+ public boolean startAtTail()
{
- return new FilterableMessage()
- {
- public boolean isPersistent()
- {
- return message.isPersistent();
- }
-
- public boolean isRedelivered()
- {
- return message.isRedelivered();
- }
-
- public Object getHeader(String name)
- {
- return message.getMessageHeader().getHeader(name);
- }
-
- public String getReplyTo()
- {
- return message.getMessageHeader().getReplyTo();
- }
-
- public String getType()
- {
- return message.getMessageHeader().getType();
- }
-
- public byte getPriority()
- {
- return message.getMessageHeader().getPriority();
- }
-
- public String getMessageId()
- {
- return message.getMessageHeader().getMessageId();
- }
-
- public long getTimestamp()
- {
- return message.getMessageHeader().getTimestamp();
- }
-
- public String getCorrelationId()
- {
- return message.getMessageHeader().getCorrelationId();
- }
-
- public long getExpiration()
- {
- return message.getMessageHeader().getExpiration();
- }
- };
+ return false;
}
public String getSelector()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Sun Mar 15 15:25:15 2015
@@ -24,4 +24,6 @@ public interface MessageFilter
{
String getName();
boolean matches(Filterable message);
+ boolean startAtTail();
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sun Mar 15 15:25:15 2015
@@ -57,6 +57,7 @@ import org.apache.qpid.server.connection
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
@@ -262,7 +263,7 @@ public abstract class AbstractQueue<X ex
private final QueueRunner _queueRunner = new QueueRunner(this);
private boolean _closing;
- private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String,Task<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -462,11 +463,20 @@ public abstract class AbstractQueue<X ex
if(filterValue.size() == 1)
{
String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
- MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
+ final MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
if(filterFactory != null)
{
- List<String> filterArguments = filterValue.values().iterator().next();
- _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
+ final List<String> filterArguments = filterValue.values().iterator().next();
+ // check the arguments are valid
+ filterFactory.newInstance(filterArguments);
+ _defaultFiltersMap.put(name, new Task<MessageFilter>()
+ {
+ @Override
+ public MessageFilter execute()
+ {
+ return filterFactory.newInstance(filterArguments);
+ }
+ });
}
else
{
@@ -786,11 +796,11 @@ public abstract class AbstractQueue<X ex
{
filters = new FilterManager();
}
- for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet())
+ for (Map.Entry<String,Task<MessageFilter>> filter : _defaultFiltersMap.entrySet())
{
if(!filters.hasFilter(filter.getKey()))
{
- filters.add(filter.getKey(), filter.getValue());
+ filters.add(filter.getKey(), filter.getValue().execute());
}
}
}
@@ -823,7 +833,16 @@ public abstract class AbstractQueue<X ex
}
consumer.setStateListener(this);
- consumer.setQueueContext(new QueueContext(getEntries().getHead()));
+ QueueContext queueContext;
+ if(filters == null || !filters.startAtTail())
+ {
+ queueContext = new QueueContext(getEntries().getHead());
+ }
+ else
+ {
+ queueContext = new QueueContext(getEntries().getTail());
+ }
+ consumer.setQueueContext(queueContext);
if (!isDeleted())
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Sun Mar 15 15:25:15 2015
@@ -160,6 +160,12 @@ public abstract class OrderedQueueEntryL
return _head;
}
+ @Override
+ public QueueEntry getTail()
+ {
+ return _tail;
+ }
+
public void entryDeleted(QueueEntry queueEntry)
{
QueueEntry next = _head.getNextNode();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sun Mar 15 15:25:15 2015
@@ -185,6 +185,14 @@ abstract public class PriorityQueueList
}
@Override
+ public PriorityQueueEntry getTail()
+ {
+ return (PriorityQueueEntry) _priorityLists[0].getTail();
+ }
+
+
+
+ @Override
public void entryDeleted(final QueueEntry queueEntry)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sun Mar 15 15:25:15 2015
@@ -34,6 +34,8 @@ public interface QueueEntryList
QueueEntry getHead();
+ QueueEntry getTail();
+
QueueEntry getOldestEntry();
void entryDeleted(QueueEntry queueEntry);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Sun Mar 15 15:25:15 2015
@@ -319,6 +319,19 @@ public class SortedQueueEntryList implem
return _head;
}
+
+ public SortedQueueEntry getTail()
+ {
+ SortedQueueEntry current = _head;
+ SortedQueueEntry next;
+ while((next = next(current))!=null)
+ {
+ current = next;
+ }
+ return current;
+ }
+
+
@Override
public QueueEntry getOldestEntry()
{
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Sun Mar 15 15:25:15 2015
@@ -113,6 +113,12 @@ public class MockConsumer implements Con
}
@Override
+ public boolean startAtTail()
+ {
+ return false;
+ }
+
+ @Override
public boolean matches(final Filterable message)
{
final String messageId = message.getMessageHeader().getMessageId();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun Mar 15 15:25:15 2015
@@ -294,7 +294,7 @@ public class ServerSessionDelegate exten
{
filterManager = new FilterManager();
}
- MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+ MessageFilter filter = new ArrivalTimeFilter(startingFrom, period == 0);
filterManager.add(filter.getName(), filter);
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Mar 15 15:25:15 2015
@@ -755,6 +755,12 @@ public class AMQChannel
{
return message.getConnectionReference() != connectionReference;
}
+
+ @Override
+ public boolean startAtTail()
+ {
+ return false;
+ }
};
filterManager.add(filter.getName(), filter);
}
@@ -788,7 +794,7 @@ public class AMQChannel
{
filterManager = new FilterManager();
}
- MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+ MessageFilter filter = new ArrivalTimeFilter(startingFrom, period==0);
filterManager.add(filter.getName(), filter);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org