You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/18 17:24:36 UTC

svn commit: r924881 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/binding/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/exchange/topic/ broker/src/test/java/org/apache/qpid/...

Author: robbie
Date: Thu Mar 18 16:24:36 2010
New Revision: 924881

URL: http://svn.apache.org/viewvc?rev=924881&view=rev
Log:
QPID-2397: add Binding.msgMatched() support to the TopicExchange, and remove its internal usage of the TopicBinding class

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Thu Mar 18 16:24:36 2010
@@ -89,29 +89,30 @@ public class Binding
     @Override
     public boolean equals(final Object o)
     {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o)
+        {
+            return true;
+        }
+        
+        if (o == null || !(o instanceof Binding))
+        {
+            return false;
+        }
 
         final Binding binding = (Binding) o;
 
-        if (!_bindingKey.equals(binding._bindingKey)) return false;
-        if (!_exchange.equals(binding._exchange)) return false;
-        if (!_queue.equals(binding._queue)) return false;
-
-        return true;
+        return (_bindingKey == null ? binding.getBindingKey() == null : _bindingKey.equals(binding.getBindingKey()))
+            && (_exchange == null ? binding.getExchange() == null : _exchange.equals(binding.getExchange()))
+            && (_queue == null ? binding.getQueue() == null : _queue.equals(binding.getQueue()));
     }
 
     @Override
     public int hashCode()
     {
-        int result = _bindingKey.hashCode();
-        result = 31 * result + _queue.hashCode();
-        result = 31 * result + _exchange.hashCode();
+        int result = _bindingKey == null ? 1 : _bindingKey.hashCode();
+        result = 31 * result + (_queue == null ? 3 : _queue.hashCode());
+        result = 31 * result + (_exchange == null ? 5 : _exchange.hashCode());
         return result;
     }
 
-
-
-
-
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Thu Mar 18 16:24:36 2010
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.topic.*;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.message.InboundMessage;
@@ -83,7 +84,7 @@ public class TopicExchange extends Abstr
     private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
             new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
 
-    private final Map<TopicBinding, FieldTable> _bindings = new HashMap<TopicBinding, FieldTable>();
+    private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
 
     private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
 
@@ -92,20 +93,12 @@ public class TopicExchange extends Abstr
         super(TYPE);
     }
 
-    public synchronized void registerQueue(String rKey, AMQQueue queue, Map<String,Object> args)
-    {
-        try
-        {
-            registerQueue(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args));
-        }
-        catch (AMQInvalidArgumentException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQInvalidArgumentException
+    protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException
     {
+        AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
+        AMQQueue queue = binding.getQueue();
+        FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
+        
         assert queue != null;
         assert rKey != null;
 
@@ -114,8 +107,6 @@ public class TopicExchange extends Abstr
 
         AMQShortString routingKey = TopicNormalizer.normalize(rKey);
 
-        TopicBinding binding = new TopicBinding(rKey, queue, args);
-
         if(_bindings.containsKey(binding))
         {
             FieldTable oldArgs = _bindings.get(binding);
@@ -146,6 +137,8 @@ public class TopicExchange extends Abstr
                     return;
                 }
             }
+            
+            result.addBinding(binding);
 
         }
         else
@@ -177,6 +170,8 @@ public class TopicExchange extends Abstr
                     result.addUnfilteredQueue(queue);
                 }
             }
+            
+            result.addBinding(binding);
             _bindings.put(binding, args);
         }
 
@@ -210,11 +205,19 @@ public class TopicExchange extends Abstr
                                           ? AMQShortString.EMPTY_STRING
                                           : new AMQShortString(payload.getRoutingKey());
 
+        _logger.info("Message routing key: " + routingKey );
+        
         // The copy here is unfortunate, but not too bad relevant to the amount of
         // things created and copied in getMatchedQueues
         ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
         queues.addAll(getMatchedQueues(payload, routingKey));
 
+        for(BaseQueue q : queues)
+        {
+            _logger.info("Matched Queue: " + q.getNameShortString() );
+        }
+
+        
         if(queues == null || queues.isEmpty())
         {
             _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
@@ -226,7 +229,8 @@ public class TopicExchange extends Abstr
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
     {
-        TopicBinding binding = new TopicBinding(routingKey, queue, arguments);
+        Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
+        
         if (arguments == null)
         {
             return _bindings.containsKey(binding);
@@ -253,7 +257,7 @@ public class TopicExchange extends Abstr
 
     public boolean isBound(AMQShortString routingKey)
     {
-        for(TopicBinding b : _bindings.keySet())
+        for(Binding b : _bindings.keySet())
         {
             if(b.getBindingKey().equals(routingKey))
             {
@@ -266,7 +270,7 @@ public class TopicExchange extends Abstr
 
     public boolean isBound(AMQQueue queue)
     {
-        for(TopicBinding b : _bindings.keySet())
+        for(Binding b : _bindings.keySet())
         {
             if(b.getQueue().equals(queue))
             {
@@ -282,19 +286,16 @@ public class TopicExchange extends Abstr
         return !_bindings.isEmpty();
     }
 
-
-    public void deregisterQueue(String rKey, AMQQueue queue, Map<String, Object> args)
-    {
-        removeBinding(new TopicBinding(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args)));
-    }
-
-    private boolean removeBinding(final TopicBinding binding)
+    private boolean deregisterQueue(final Binding binding)
     {
         if(_bindings.containsKey(binding))
         {
             FieldTable bindingArgs = _bindings.remove(binding);
-            AMQShortString bindingKey = TopicNormalizer.normalize(binding.getBindingKey());
+            AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
             TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+            
+            result.removeBinding(binding);
+            
             if(argumentsContainSelector(bindingArgs))
             {
                 try
@@ -341,8 +342,14 @@ public class TopicExchange extends Abstr
             Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
             for(TopicMatcherResult result : results)
             {
+                TopicExchangeResult res = (TopicExchangeResult)result;
 
-                queues = ((TopicExchangeResult)result).processMessage(message, queues);
+                for(Binding b : res.getBindings())
+                {
+                    b.incrementMatches();
+                }
+                
+                queues = res.processMessage(message, queues);
             }
             return queues;
         }
@@ -350,14 +357,21 @@ public class TopicExchange extends Abstr
 
     }
 
-    protected void onBind(final org.apache.qpid.server.binding.Binding binding)
+    protected void onBind(final Binding binding)
     {
-        registerQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments());
+        try
+        {
+            registerQueue(binding);
+        }
+        catch (AMQInvalidArgumentException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
-    protected void onUnbind(final org.apache.qpid.server.binding.Binding binding)
+    protected void onUnbind(final Binding binding)
     {
-        deregisterQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments());
+        deregisterQueue(binding);
     }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Thu Mar 18 16:24:36 2010
@@ -21,14 +21,22 @@
 package org.apache.qpid.server.exchange.topic;
 
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InboundMessage;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public final class TopicExchangeResult implements TopicMatcherResult
 {
+    private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
     private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
     private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
 
@@ -64,6 +72,20 @@ public final class TopicExchangeResult i
         return _unfilteredQueues.keySet();
     }
 
+    public void addBinding(Binding binding)
+    {
+        _bindings.add(binding);
+    }
+    
+    public void removeBinding(Binding binding)
+    {
+        _bindings.remove(binding);
+    }
+    
+    public List<Binding> getBindings()
+    {
+        return new ArrayList<Binding>(_bindings);
+    }
 
     public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
     {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Thu Mar 18 16:24:36 2010
@@ -28,6 +28,7 @@ import org.apache.qpid.server.registry.A
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.AMQException;
@@ -64,7 +65,7 @@ public class TopicExchangeTest extends T
     public void testNoRoute() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.b");
@@ -76,7 +77,7 @@ public class TopicExchangeTest extends T
     public void testDirectMatch() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("ab"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.b");
@@ -103,7 +104,7 @@ public class TopicExchangeTest extends T
     public void testStarMatch() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.*"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.*", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.b");
@@ -142,7 +143,7 @@ public class TopicExchangeTest extends T
     public void testHashMatch() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.#"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.#", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.b.c");
@@ -205,7 +206,7 @@ public class TopicExchangeTest extends T
     public void testMidHash() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.c.d.b");
@@ -235,7 +236,7 @@ public class TopicExchangeTest extends T
     public void testMatchafterHash() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.*.#.b.c", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.c.b.b");
@@ -281,7 +282,7 @@ public class TopicExchangeTest extends T
     public void testHashAfterHash() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.*.#.b.c.#.d", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.c.b.b.c");
@@ -308,7 +309,7 @@ public class TopicExchangeTest extends T
     public void testHashHash() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.#.*.#.d", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.c.b.b.c");
@@ -334,7 +335,7 @@ public class TopicExchangeTest extends T
     public void testSubMatchFails() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.b.c.d", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.b.c");
@@ -364,7 +365,7 @@ public class TopicExchangeTest extends T
     public void testMoreRouting() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a.b.c");
@@ -379,7 +380,7 @@ public class TopicExchangeTest extends T
     public void testMoreQueue() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
-        _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+        _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null));
 
 
         IncomingMessage message = createMessage("a");

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Thu Mar 18 16:24:36 2010
@@ -334,6 +334,7 @@ public class DurableSubscriptionTest ext
         {
             _logger.info("Receive message on consumer 3 :expecting B");
             msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+            assertNotNull(msg);
             assertEquals("B", ((TextMessage) msg).getText());
         }
         



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org