You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/07/17 11:26:51 UTC

svn commit: r556861 - in /incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server: exchange/ handler/

Author: ritchiem
Date: Tue Jul 17 02:26:47 2007
New Revision: 556861

URL: http://svn.apache.org/viewvc?view=rev&rev=556861
Log:
QPID-538 Check to ensure a duplicate binding is not created.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Tue Jul 17 02:26:47 2007
@@ -187,19 +187,24 @@
         }
     }
 
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+    {
+        return isBound(routingKey,queue);
+    }
+
+    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
         final List<AMQQueue> queues = _index.get(routingKey);
         return queues != null && queues.contains(queue);
     }
 
-    public boolean isBound(AMQShortString routingKey) throws AMQException
+    public boolean isBound(AMQShortString routingKey)
     {
         final List<AMQQueue> queues = _index.get(routingKey);
         return queues != null && !queues.isEmpty();
     }
 
-    public boolean isBound(AMQQueue queue) throws AMQException
+    public boolean isBound(AMQQueue queue)
     {
         Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap();
         for (List<AMQQueue> queues : bindings.values())
@@ -212,7 +217,7 @@
         return false;
     }
 
-    public boolean hasBindings() throws AMQException
+    public boolean hasBindings()
     {
         return !_index.getBindingsMap().isEmpty();
     }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Tue Jul 17 02:26:47 2007
@@ -230,21 +230,26 @@
         }
     }
 
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+    {
+        return isBound(routingKey, queue);
+    }
+
+    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
         List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
 
         return (queues != null) && queues.contains(queue);
     }
 
-    public boolean isBound(AMQShortString routingKey) throws AMQException
+    public boolean isBound(AMQShortString routingKey)
     {
         List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
 
         return (queues != null) && !queues.isEmpty();
     }
 
-    public boolean isBound(AMQQueue queue) throws AMQException
+    public boolean isBound(AMQQueue queue)
     {
         for (List<AMQQueue> queues : _routingKey2queues.values())
         {
@@ -257,7 +262,7 @@
         return false;
     }
 
-    public boolean hasBindings() throws AMQException
+    public boolean hasBindings()
     {
         return !_routingKey2queues.isEmpty();
     }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Tue Jul 17 02:26:47 2007
@@ -27,8 +27,8 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.Map;
 import java.util.List;
+import java.util.Map;
 
 public interface Exchange
 {
@@ -55,6 +55,17 @@
 
     void route(AMQMessage message) throws AMQException;
 
+
+    /**
+     * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
+     * @param routingKey
+     * @param arguments
+     * @param queue
+     * @return
+     * @throws AMQException
+     */
+    boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue);
+
     /**
      * Determines whether a message would be isBound to a particular queue using a specific routing key
      * @param routingKey
@@ -62,7 +73,7 @@
      * @return
      * @throws AMQException
      */
-    boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException;
+    boolean isBound(AMQShortString routingKey, AMQQueue queue);
 
     /**
      * Determines whether a message is routing to any queue using a specific _routing key
@@ -70,16 +81,17 @@
      * @return
      * @throws AMQException
      */
-    boolean isBound(AMQShortString routingKey) throws AMQException;
+    boolean isBound(AMQShortString routingKey);
 
-    boolean isBound(AMQQueue queue) throws AMQException;
+    boolean isBound(AMQQueue queue);
 
     /**
      * Returns true if this exchange has at least one binding associated with it.
      * @return
      * @throws AMQException
      */
-    boolean hasBindings() throws AMQException;
+    boolean hasBindings();
 
     Map<AMQShortString, List<AMQQueue>> getBindings();
+
 }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Jul 17 02:26:47 2007
@@ -180,24 +180,29 @@
         }
     }
 
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+    {
+        return isBound(routingKey, queue);
+    }
+
+    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
         return _queues.contains(queue);
     }
 
-    public boolean isBound(AMQShortString routingKey) throws AMQException
+    public boolean isBound(AMQShortString routingKey)
     {
 
         return (_queues != null) && !_queues.isEmpty();
     }
 
-    public boolean isBound(AMQQueue queue) throws AMQException
+    public boolean isBound(AMQQueue queue)
     {
 
         return _queues.contains(queue);
     }
 
-    public boolean hasBindings() throws AMQException
+    public boolean hasBindings()
     {
         return !_queues.isEmpty();
     }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Jul 17 02:26:47 2007
@@ -241,17 +241,23 @@
         }
     }
 
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+    {
+        //fixme isBound here should take the arguements in to consideration.
+        return isBound(routingKey, queue);
+    }
+
+    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
         return isBound(queue);
     }
 
-    public boolean isBound(AMQShortString routingKey) throws AMQException
+    public boolean isBound(AMQShortString routingKey)
     {
         return hasBindings();
     }
 
-    public boolean isBound(AMQQueue queue) throws AMQException
+    public boolean isBound(AMQQueue queue)
     {
         for (Registration r : _bindings)
         {
@@ -263,7 +269,7 @@
         return false;
     }
 
-    public boolean hasBindings() throws AMQException
+    public boolean hasBindings()
     {
         return !_bindings.isEmpty();
     }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Jul 17 02:26:47 2007
@@ -28,6 +28,7 @@
 import org.apache.qpid.framing.QueueBindOkBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,7 +37,6 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
 
 public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
 {
@@ -77,7 +77,7 @@
             {
                 throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
             }
-            
+
             if (body.routingKey == null)
             {
                 body.routingKey = queue.getName();
@@ -98,8 +98,11 @@
             throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
         }
         try
-        {            
-            queue.bind(body.routingKey, body.arguments, exch);
+        {
+            if (!exch.isBound(body.routingKey, body.arguments, queue))
+            {
+                queue.bind(body.routingKey, body.arguments, exch);
+            }
         }
         catch (AMQInvalidRoutingKeyException rke)
         {