You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/07/17 11:26:51 UTC
svn commit: r556861 - in
/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server:
exchange/ handler/
Author: ritchiem
Date: Tue Jul 17 02:26:47 2007
New Revision: 556861
URL: http://svn.apache.org/viewvc?view=rev&rev=556861
Log:
QPID-538 Check to ensure a duplicate binding is not created.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Tue Jul 17 02:26:47 2007
@@ -187,19 +187,24 @@
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey,queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
final List<AMQQueue> queues = _index.get(routingKey);
return queues != null && queues.contains(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
final List<AMQQueue> queues = _index.get(routingKey);
return queues != null && !queues.isEmpty();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap();
for (List<AMQQueue> queues : bindings.values())
@@ -212,7 +217,7 @@
return false;
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_index.getBindingsMap().isEmpty();
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Tue Jul 17 02:26:47 2007
@@ -230,21 +230,26 @@
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
return (queues != null) && queues.contains(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
return (queues != null) && !queues.isEmpty();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
for (List<AMQQueue> queues : _routingKey2queues.values())
{
@@ -257,7 +262,7 @@
return false;
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_routingKey2queues.isEmpty();
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Tue Jul 17 02:26:47 2007
@@ -27,8 +27,8 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Map;
import java.util.List;
+import java.util.Map;
public interface Exchange
{
@@ -55,6 +55,17 @@
void route(AMQMessage message) throws AMQException;
+
+ /**
+ * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
+ * @param routingKey
+ * @param arguments
+ * @param queue
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue);
+
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key
* @param routingKey
@@ -62,7 +73,7 @@
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException;
+ boolean isBound(AMQShortString routingKey, AMQQueue queue);
/**
* Determines whether a message is routing to any queue using a specific _routing key
@@ -70,16 +81,17 @@
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey) throws AMQException;
+ boolean isBound(AMQShortString routingKey);
- boolean isBound(AMQQueue queue) throws AMQException;
+ boolean isBound(AMQQueue queue);
/**
* Returns true if this exchange has at least one binding associated with it.
* @return
* @throws AMQException
*/
- boolean hasBindings() throws AMQException;
+ boolean hasBindings();
Map<AMQShortString, List<AMQQueue>> getBindings();
+
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Jul 17 02:26:47 2007
@@ -180,24 +180,29 @@
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return _queues.contains(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
return (_queues != null) && !_queues.isEmpty();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
return _queues.contains(queue);
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_queues.isEmpty();
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Jul 17 02:26:47 2007
@@ -241,17 +241,23 @@
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ //fixme isBound here should take the arguements in to consideration.
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return isBound(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
return hasBindings();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
for (Registration r : _bindings)
{
@@ -263,7 +269,7 @@
return false;
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_bindings.isEmpty();
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=556861&r1=556860&r2=556861
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Jul 17 02:26:47 2007
@@ -28,6 +28,7 @@
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,7 +37,6 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -77,7 +77,7 @@
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
-
+
if (body.routingKey == null)
{
body.routingKey = queue.getName();
@@ -98,8 +98,11 @@
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
}
try
- {
- queue.bind(body.routingKey, body.arguments, exch);
+ {
+ if (!exch.isBound(body.routingKey, body.arguments, queue))
+ {
+ queue.bind(body.routingKey, body.arguments, exch);
+ }
}
catch (AMQInvalidRoutingKeyException rke)
{