You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/15 16:25:16 UTC

svn commit: r1666810 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/exchange/topic/ broker-core/src/main/java/org/apache/qpid/server/filter/ broker-core/src/main/j...

Author: rgodfrey
Date: Sun Mar 15 15:25:15 2015
New Revision: 1666810

URL: http://svn.apache.org/r1666810
Log:
QPID-6452 : Defer creating filters in default filter map until a consumer is added. Optimize filters which start at the tail of the queue.

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Sun Mar 15 15:25:15 2015
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
@@ -58,7 +59,7 @@ public class DirectExchange extends Abst
     {
         private CopyOnWriteArraySet<BindingImpl> _bindings = new CopyOnWriteArraySet<BindingImpl>();
         private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>();
-        private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>();
+        private Map<BaseQueue, FilterManager> _filteredQueues = new HashMap<>();
 
         public synchronized void addBinding(BindingImpl binding)
         {
@@ -80,7 +81,7 @@ public class DirectExchange extends Abst
         private void recalculateQueues()
         {
             List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
-            Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>();
+            Map<BaseQueue, FilterManager> filteredQueues = new HashMap<>();
 
             for(BindingImpl b : _bindings)
             {
@@ -89,7 +90,7 @@ public class DirectExchange extends Abst
                 {
                     try
                     {
-                        MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue());
+                        FilterManager filter = FilterSupport.createMessageFilter(b.getArguments(), b.getAMQQueue());
                         filteredQueues.put(b.getAMQQueue(),filter);
                     }
                     catch (AMQInvalidArgumentException e)
@@ -129,7 +130,7 @@ public class DirectExchange extends Abst
             return !_filteredQueues.isEmpty();
         }
 
-        public Map<BaseQueue,MessageFilter> getFilteredQueues()
+        public Map<BaseQueue,FilterManager> getFilteredQueues()
         {
             return _filteredQueues;
         }
@@ -159,14 +160,15 @@ public class DirectExchange extends Abst
             if(bindings.hasFilteredQueues())
             {
                 Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues);
+                Filterable filterable = Filterable.Factory.newInstance(payload, instanceProperties);
 
-                Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues();
-                for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet())
+                Map<BaseQueue, FilterManager> filteredQueues = bindings.getFilteredQueues();
+                for(Map.Entry<BaseQueue, FilterManager> entry : filteredQueues.entrySet())
                 {
                     if(!queuesSet.contains(entry.getKey()))
                     {
-                        MessageFilter filter = entry.getValue();
-                        if(filter.matches(Filterable.Factory.newInstance(payload, instanceProperties)))
+                        FilterManager filter = entry.getValue();
+                        if(filter.allAllow(filterable))
                         {
                             queuesSet.add(entry.getKey());
                         }
@@ -174,7 +176,7 @@ public class DirectExchange extends Abst
                 }
                 if(queues.size() != queuesSet.size())
                 {
-                    queues = new ArrayList<BaseQueue>(queuesSet);
+                    queues = new ArrayList<>(queuesSet);
                 }
             }
             return queues;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Sun Mar 15 15:25:15 2015
@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ManagedObject;
@@ -58,10 +58,10 @@ public class FanoutExchange extends Abst
     private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>();
     private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>();
 
-    private final AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>>  _filteredBindings =
-            new AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>>();
+    private final AtomicReference<Map<AMQQueue,Map<BindingImpl, FilterManager>>>  _filteredBindings =
+            new AtomicReference<>();
     {
-        Map<AMQQueue,Map<BindingImpl, MessageFilter>> emptyMap = Collections.emptyMap();
+        Map<AMQQueue,Map<BindingImpl, FilterManager>> emptyMap = Collections.emptyMap();
         _filteredBindings.set(emptyMap);
     }
 
@@ -85,17 +85,17 @@ public class FanoutExchange extends Abst
         final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
 
 
-        final Map<AMQQueue, Map<BindingImpl, MessageFilter>> filteredBindings = _filteredBindings.get();
+        final Map<AMQQueue, Map<BindingImpl, FilterManager>> filteredBindings = _filteredBindings.get();
         if(!_filteredQueues.isEmpty())
         {
             for(AMQQueue q : _filteredQueues)
             {
-                final Map<BindingImpl, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
+                final Map<BindingImpl, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
                 if(!(bindingMessageFilterMap == null || result.contains(q)))
                 {
-                    for(MessageFilter filter : bindingMessageFilterMap.values())
+                    for(FilterManager filter : bindingMessageFilterMap.values())
                     {
-                        if(filter.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
+                        if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
                         {
                             result.add(q);
                             break;
@@ -143,12 +143,12 @@ public class FanoutExchange extends Abst
         }
         else
         {
-            HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
-                    new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+            HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+                    new HashMap<>(_filteredBindings.get());
 
-            Map<BindingImpl,MessageFilter> bindingsForQueue;
+            Map<BindingImpl,FilterManager> bindingsForQueue;
 
-            final MessageFilter messageFilter;
+            final FilterManager messageFilter;
 
             try
             {
@@ -163,11 +163,11 @@ public class FanoutExchange extends Abst
 
             if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
             {
-                bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(filteredBindings.remove(binding.getAMQQueue()));
+                bindingsForQueue = new HashMap<>(filteredBindings.remove(binding.getAMQQueue()));
             }
             else // previously unfiltered
             {
-                bindingsForQueue = new HashMap<BindingImpl,MessageFilter>();
+                bindingsForQueue = new HashMap<BindingImpl,FilterManager>();
 
                 Integer oldValue = _queues.remove(queue);
                 if (ONE.equals(oldValue))
@@ -217,16 +217,16 @@ public class FanoutExchange extends Abst
             try
             {
 
-                HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
-                        new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+                HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+                        new HashMap<>(_filteredBindings.get());
 
-                Map<BindingImpl, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
-                final MessageFilter messageFilter =
+                Map<BindingImpl, FilterManager> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
+                final FilterManager messageFilter =
                         FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue());
 
                 if(bindingsForQueue != null)
                 {
-                    bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue);
+                    bindingsForQueue = new HashMap<>(bindingsForQueue);
                     bindingsForQueue.put(binding, messageFilter);
                 }
                 else
@@ -278,13 +278,13 @@ public class FanoutExchange extends Abst
         }
         else // we are removing a binding with filters
         {
-            HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
-                    new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+            HashMap<AMQQueue,Map<BindingImpl, FilterManager>> filteredBindings =
+                    new HashMap<>(_filteredBindings.get());
 
-            Map<BindingImpl,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
+            Map<BindingImpl,FilterManager> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
             if(bindingsForQueue.size()>1)
             {
-                bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue);
+                bindingsForQueue = new HashMap<>(bindingsForQueue);
                 bindingsForQueue.remove(binding);
                 filteredBindings.put(binding.getAMQQueue(),bindingsForQueue);
             }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Sun Mar 15 15:25:15 2015
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
@@ -48,7 +49,7 @@ class HeadersBinding
     private final Set<String> required = new HashSet<String>();
     private final Map<String,Object> matches = new HashMap<String,Object>();
     private boolean matchAny;
-    private MessageFilter _filter;
+    private FilterManager _filter;
 
     /**
      * Creates a header binding for a set of mappings. Those mappings whose value is
@@ -86,7 +87,8 @@ class HeadersBinding
                 _logger.warn("Invalid filter in binding queue '"+_binding.getAMQQueue().getName()
                              +"' to exchange '"+_binding.getExchange().getName()
                              +"' with arguments: " + _binding.getArguments());
-                _filter = new MessageFilter()
+                _filter = new FilterManager();
+                _filter.add("x-exclude-all",new MessageFilter()
                     {
                     @Override
                         public String getName()
@@ -94,12 +96,18 @@ class HeadersBinding
                             return "";
                         }
 
-                        @Override
+                    @Override
+                    public boolean startAtTail()
+                    {
+                        return false;
+                    }
+
+                    @Override
                         public boolean matches(Filterable message)
                         {
                             return false;
                         }
-                    };
+                    });
             }
         }
         for(Map.Entry<String, Object> entry : _mappings.entrySet())
@@ -146,7 +154,7 @@ class HeadersBinding
 
     public boolean matches(Filterable message)
     {
-        return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
+        return matches(message.getMessageHeader()) && (_filter == null || _filter.allAllow(message));
     }
 
     private boolean and(AMQMessageHeader headers)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Sun Mar 15 15:25:15 2015
@@ -31,15 +31,15 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.queue.AMQQueue;
 
 public final class TopicExchangeResult implements TopicMatcherResult
 {
     private final List<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>();
     private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
-    private final ConcurrentMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
+    private final ConcurrentMap<AMQQueue, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
     private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0);
 
     public void addUnfilteredQueue(AMQQueue queue)
@@ -93,15 +93,15 @@ public final class TopicExchangeResult i
     
     public List<BindingImpl> getBindings()
     {
-        return new ArrayList<BindingImpl>(_bindings);
+        return new ArrayList<>(_bindings);
     }
 
-    public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
+    public void addFilteredQueue(AMQQueue queue, FilterManager filter)
     {
-        Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+        Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
         if(filters == null)
         {
-            filters = new ConcurrentHashMap<MessageFilter,Integer>();
+            filters = new ConcurrentHashMap<>();
             _filteredQueues.put(queue, filters);
         }
         Integer instances = filters.get(filter);
@@ -116,9 +116,9 @@ public final class TopicExchangeResult i
 
     }
 
-    public void removeFilteredQueue(AMQQueue queue, MessageFilter filter)
+    public void removeFilteredQueue(AMQQueue queue, FilterManager filter)
     {
-        Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+        Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
         if(filters != null)
         {
             Integer instances = filters.get(filter);
@@ -143,11 +143,11 @@ public final class TopicExchangeResult i
     }
 
     public void replaceQueueFilter(AMQQueue queue,
-                                   MessageFilter oldFilter,
-                                   MessageFilter newFilter)
+                                   FilterManager oldFilter,
+                                   FilterManager newFilter)
     {
-        Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
-        Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters);
+        Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
+        Map<FilterManager,Integer> newFilters = new ConcurrentHashMap<>(filters);
         Integer oldFilterInstances = filters.get(oldFilter);
         if(oldFilterInstances == 1)
         {
@@ -190,13 +190,13 @@ public final class TopicExchangeResult i
         queues.addAll(_unfilteredQueues.keySet());
         if(!_filteredQueues.isEmpty())
         {
-            for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet())
+            for(Map.Entry<AMQQueue, Map<FilterManager, Integer>> entry : _filteredQueues.entrySet())
             {
                 if(!queues.contains(entry.getKey()))
                 {
-                    for(MessageFilter filter : entry.getValue().keySet())
+                    for(FilterManager filter : entry.getValue().keySet())
                     {
-                        if(filter.matches(msg))
+                        if(filter.allAllow(msg))
                         {
                             queues.add(entry.getKey());
                         }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java Sun Mar 15 15:25:15 2015
@@ -25,10 +25,12 @@ import org.apache.qpid.common.AMQPFilter
 public final class ArrivalTimeFilter implements MessageFilter
 {
     private final long _startingFrom;
+    private final boolean _startAtTail;
 
-    public ArrivalTimeFilter(final long startingFrom)
+    public ArrivalTimeFilter(final long startingFrom, final boolean startAtTail)
     {
         _startingFrom = startingFrom;
+        _startAtTail = startAtTail;
     }
 
     @Override
@@ -38,9 +40,38 @@ public final class ArrivalTimeFilter imp
     }
 
     @Override
+    public boolean startAtTail()
+    {
+        return _startAtTail;
+    }
+
+    @Override
     public boolean matches(final Filterable message)
     {
-        return message.getArrivalTime() >= _startingFrom;
+        return message.getArrivalTime() >=  _startingFrom;
+    }
+
+    @Override
+    public boolean equals(final Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        final ArrivalTimeFilter that = (ArrivalTimeFilter) o;
+
+        return _startingFrom == that._startingFrom;
+
     }
 
+    @Override
+    public int hashCode()
+    {
+        return (int) (_startingFrom ^ (_startingFrom >>> 32));
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java Sun Mar 15 15:25:15 2015
@@ -40,7 +40,7 @@ public final class ArrivalTimeFilterFact
         String arg = arguments.get(0);
         long startingFrom= Long.parseLong(arg);
 
-        return new ArrivalTimeFilter(startingFrom);
+        return new ArrivalTimeFilter(System.currentTimeMillis() + startingFrom, startingFrom==0l);
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java Sun Mar 15 15:25:15 2015
@@ -50,6 +50,18 @@ public class FilterManager
         return true;
     }
 
+    public boolean startAtTail()
+    {
+        for(MessageFilter filter : _filters.values())
+        {
+            if(filter.startAtTail())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public Iterator<MessageFilter> filters()
     {
         return _filters.values().iterator();
@@ -65,11 +77,42 @@ public class FilterManager
         return _filters.containsKey(name);
     }
 
+    public boolean hasFilter(final MessageFilter filter)
+    {
+        return _filters.containsValue(filter);
+    }
+
     @Override
     public String toString()
     {
         return _filters.toString();
     }
 
+    @Override
+    public boolean equals(final Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        final FilterManager that = (FilterManager) o;
+
+        if (_filters != null ? !_filters.equals(that._filters) : that._filters != null)
+        {
+            return false;
+        }
+
+        return true;
+    }
 
+    @Override
+    public int hashCode()
+    {
+        return _filters != null ? _filters.hashCode() : 0;
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Sun Mar 15 15:25:15 2015
@@ -95,22 +95,25 @@ public class FilterSupport
                        && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
     }
 
-    public static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+    public static FilterManager createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
     {
+        FilterManager filterManager = null;
         if(argumentsContainNoLocal(args))
         {
-            MessageFilter filter = new NoLocalFilter(queue);
+            filterManager = new FilterManager();
+            filterManager.add(AMQPFilterTypes.NO_LOCAL.toString(), new NoLocalFilter(queue));
+        }
 
-            if(argumentsContainJMSSelector(args))
+        if(argumentsContainJMSSelector(args))
+        {
+            if(filterManager == null)
             {
-                filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+                filterManager = new FilterManager();
             }
-            return filter;
-        }
-        else
-        {
-            return createJMSSelectorFilter(args);
+            filterManager.add(AMQPFilterTypes.JMS_SELECTOR.toString(),createJMSSelectorFilter(args));
         }
+        return filterManager;
+
     }
 
     @PluggableService
@@ -144,6 +147,12 @@ public class FilterSupport
         }
 
         @Override
+        public boolean startAtTail()
+        {
+            return false;
+        }
+
+        @Override
         public boolean equals(Object o)
         {
             if (this == o)
@@ -170,62 +179,4 @@ public class FilterSupport
 
     }
 
-    static final class CompoundFilter implements MessageFilter
-    {
-        private MessageFilter _noLocalFilter;
-        private MessageFilter _jmsSelectorFilter;
-
-        public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
-        {
-            _noLocalFilter = filter;
-            _jmsSelectorFilter = jmsSelectorFilter;
-        }
-
-        @Override
-        public String getName()
-        {
-            return "";
-        }
-
-        public boolean matches(Filterable message)
-        {
-            return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o)
-            {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass())
-            {
-                return false;
-            }
-
-            CompoundFilter that = (CompoundFilter) o;
-
-            if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
-            {
-                return false;
-            }
-            if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
-            {
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override
-        public int hashCode()
-        {
-            int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
-            result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
-            return result;
-        }
-
-
-    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java Sun Mar 15 15:25:15 2015
@@ -20,11 +20,12 @@
 */
 package org.apache.qpid.server.filter;
 
+import org.apache.qpid.filter.FilterableMessage;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
 
-public interface Filterable
+public interface Filterable extends FilterableMessage
 {
     AMQMessageHeader getMessageHeader();
 
@@ -81,6 +82,55 @@ public interface Filterable
                 {
                     return message.getArrivalTime();
                 }
+
+                @Override
+                public String getReplyTo()
+                {
+                    return message.getMessageHeader().getReplyTo();
+                }
+
+                @Override
+                public String getType()
+                {
+                    return message.getMessageHeader().getType();
+                }
+
+                @Override
+                public byte getPriority()
+                {
+                    return message.getMessageHeader().getPriority();
+                }
+
+                @Override
+                public String getMessageId()
+                {
+                    return message.getMessageHeader().getMessageId();
+                }
+
+                @Override
+                public long getTimestamp()
+                {
+                    return message.getMessageHeader().getTimestamp();
+                }
+
+                @Override
+                public String getCorrelationId()
+                {
+                    return message.getMessageHeader().getCorrelationId();
+                }
+
+                @Override
+                public long getExpiration()
+                {
+                    return message.getMessageHeader().getExpiration();
+                }
+
+                @Override
+                public Object getHeader(String name)
+                {
+                    return message.getMessageHeader().getHeader(name);
+                }
+
             };
         }
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Sun Mar 15 15:25:15 2015
@@ -60,7 +60,7 @@ public class JMSSelectorFilter implement
     public boolean matches(Filterable message)
     {
 
-        boolean match = _matcher.matches(wrap(message));
+        boolean match = _matcher.matches(message);
         if(_logger.isDebugEnabled())
         {
             _logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
@@ -68,60 +68,10 @@ public class JMSSelectorFilter implement
         return match;
     }
 
-    private FilterableMessage wrap(final Filterable message)
+    @Override
+    public boolean startAtTail()
     {
-        return new FilterableMessage()
-        {
-            public boolean isPersistent()
-            {
-                return message.isPersistent();
-            }
-
-            public boolean isRedelivered()
-            {
-                return message.isRedelivered();
-            }
-
-            public Object getHeader(String name)
-            {
-                return message.getMessageHeader().getHeader(name);
-            }
-
-            public String getReplyTo()
-            {
-                return message.getMessageHeader().getReplyTo();
-            }
-
-            public String getType()
-            {
-                return message.getMessageHeader().getType();
-            }
-
-            public byte getPriority()
-            {
-                return message.getMessageHeader().getPriority();
-            }
-
-            public String getMessageId()
-            {
-                return message.getMessageHeader().getMessageId();
-            }
-
-            public long getTimestamp()
-            {
-                return message.getMessageHeader().getTimestamp();
-            }
-
-            public String getCorrelationId()
-            {
-                return message.getMessageHeader().getCorrelationId();
-            }
-
-            public long getExpiration()
-            {
-                return message.getMessageHeader().getExpiration();
-            }
-        };
+        return false;
     }
 
     public String getSelector()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Sun Mar 15 15:25:15 2015
@@ -24,4 +24,6 @@ public interface MessageFilter
 {
     String getName();
     boolean matches(Filterable message);
+    boolean startAtTail();
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sun Mar 15 15:25:15 2015
@@ -57,6 +57,7 @@ import org.apache.qpid.server.connection
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.logging.EventLogger;
@@ -262,7 +263,7 @@ public abstract class AbstractQueue<X ex
 
     private final QueueRunner _queueRunner = new QueueRunner(this);
     private boolean _closing;
-    private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String,Task<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
 
     protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
     {
@@ -462,11 +463,20 @@ public abstract class AbstractQueue<X ex
                 if(filterValue.size() == 1)
                 {
                     String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
-                    MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
+                    final MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
                     if(filterFactory != null)
                     {
-                        List<String> filterArguments = filterValue.values().iterator().next();
-                        _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
+                        final List<String> filterArguments = filterValue.values().iterator().next();
+                        // check the arguments are valid
+                        filterFactory.newInstance(filterArguments);
+                        _defaultFiltersMap.put(name, new Task<MessageFilter>()
+                        {
+                            @Override
+                            public MessageFilter execute()
+                            {
+                                return filterFactory.newInstance(filterArguments);
+                            }
+                        });
                     }
                     else
                     {
@@ -786,11 +796,11 @@ public abstract class AbstractQueue<X ex
             {
                 filters = new FilterManager();
             }
-            for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet())
+            for (Map.Entry<String,Task<MessageFilter>> filter : _defaultFiltersMap.entrySet())
             {
                 if(!filters.hasFilter(filter.getKey()))
                 {
-                    filters.add(filter.getKey(), filter.getValue());
+                    filters.add(filter.getKey(), filter.getValue().execute());
                 }
             }
         }
@@ -823,7 +833,16 @@ public abstract class AbstractQueue<X ex
         }
 
         consumer.setStateListener(this);
-        consumer.setQueueContext(new QueueContext(getEntries().getHead()));
+        QueueContext queueContext;
+        if(filters == null || !filters.startAtTail())
+        {
+            queueContext = new QueueContext(getEntries().getHead());
+        }
+        else
+        {
+            queueContext = new QueueContext(getEntries().getTail());
+        }
+        consumer.setQueueContext(queueContext);
 
         if (!isDeleted())
         {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Sun Mar 15 15:25:15 2015
@@ -160,6 +160,12 @@ public abstract class OrderedQueueEntryL
         return _head;
     }
 
+    @Override
+    public QueueEntry getTail()
+    {
+        return _tail;
+    }
+
     public void entryDeleted(QueueEntry queueEntry)
     {
         QueueEntry next = _head.getNextNode();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sun Mar 15 15:25:15 2015
@@ -185,6 +185,14 @@ abstract public class PriorityQueueList
         }
 
         @Override
+        public PriorityQueueEntry getTail()
+        {
+            return (PriorityQueueEntry) _priorityLists[0].getTail();
+        }
+
+
+
+        @Override
         public void entryDeleted(final QueueEntry queueEntry)
         {
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sun Mar 15 15:25:15 2015
@@ -34,6 +34,8 @@ public interface QueueEntryList
 
     QueueEntry getHead();
 
+    QueueEntry getTail();
+
     QueueEntry getOldestEntry();
 
     void entryDeleted(QueueEntry queueEntry);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Sun Mar 15 15:25:15 2015
@@ -319,6 +319,19 @@ public class SortedQueueEntryList implem
         return _head;
     }
 
+
+    public SortedQueueEntry getTail()
+    {
+        SortedQueueEntry current = _head;
+        SortedQueueEntry next;
+        while((next = next(current))!=null)
+        {
+            current = next;
+        }
+        return current;
+    }
+
+
     @Override
     public QueueEntry getOldestEntry()
     {

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Sun Mar 15 15:25:15 2015
@@ -113,6 +113,12 @@ public class MockConsumer implements Con
                 }
 
                 @Override
+                public boolean startAtTail()
+                {
+                    return false;
+                }
+
+                @Override
                 public boolean matches(final Filterable message)
                 {
                     final String messageId = message.getMessageHeader().getMessageId();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun Mar 15 15:25:15 2015
@@ -294,7 +294,7 @@ public class ServerSessionDelegate exten
                         {
                             filterManager = new FilterManager();
                         }
-                        MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+                        MessageFilter filter = new ArrivalTimeFilter(startingFrom, period == 0);
                         filterManager.add(filter.getName(), filter);
 
                     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1666810&r1=1666809&r2=1666810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Mar 15 15:25:15 2015
@@ -755,6 +755,12 @@ public class AMQChannel
                     {
                         return message.getConnectionReference() != connectionReference;
                     }
+
+                    @Override
+                    public boolean startAtTail()
+                    {
+                        return false;
+                    }
                 };
                 filterManager.add(filter.getName(), filter);
             }
@@ -788,7 +794,7 @@ public class AMQChannel
                 {
                     filterManager = new FilterManager();
                 }
-                MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+                MessageFilter filter = new ArrivalTimeFilter(startingFrom, period==0);
                 filterManager.add(filter.getName(), filter);
 
             }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org