You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/18 17:24:36 UTC
svn commit: r924881 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/binding/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/exchange/topic/
broker/src/test/java/org/apache/qpid/...
Author: robbie
Date: Thu Mar 18 16:24:36 2010
New Revision: 924881
URL: http://svn.apache.org/viewvc?rev=924881&view=rev
Log:
QPID-2397: add Binding.msgMatched() support to the TopicExchange, and remove its internal usage of the TopicBinding class
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Thu Mar 18 16:24:36 2010
@@ -89,29 +89,30 @@ public class Binding
@Override
public boolean equals(final Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null || !(o instanceof Binding))
+ {
+ return false;
+ }
final Binding binding = (Binding) o;
- if (!_bindingKey.equals(binding._bindingKey)) return false;
- if (!_exchange.equals(binding._exchange)) return false;
- if (!_queue.equals(binding._queue)) return false;
-
- return true;
+ return (_bindingKey == null ? binding.getBindingKey() == null : _bindingKey.equals(binding.getBindingKey()))
+ && (_exchange == null ? binding.getExchange() == null : _exchange.equals(binding.getExchange()))
+ && (_queue == null ? binding.getQueue() == null : _queue.equals(binding.getQueue()));
}
@Override
public int hashCode()
{
- int result = _bindingKey.hashCode();
- result = 31 * result + _queue.hashCode();
- result = 31 * result + _exchange.hashCode();
+ int result = _bindingKey == null ? 1 : _bindingKey.hashCode();
+ result = 31 * result + (_queue == null ? 3 : _queue.hashCode());
+ result = 31 * result + (_exchange == null ? 5 : _exchange.hashCode());
return result;
}
-
-
-
-
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Thu Mar 18 16:24:36 2010
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.topic.*;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.message.InboundMessage;
@@ -83,7 +84,7 @@ public class TopicExchange extends Abstr
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
- private final Map<TopicBinding, FieldTable> _bindings = new HashMap<TopicBinding, FieldTable>();
+ private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
@@ -92,20 +93,12 @@ public class TopicExchange extends Abstr
super(TYPE);
}
- public synchronized void registerQueue(String rKey, AMQQueue queue, Map<String,Object> args)
- {
- try
- {
- registerQueue(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args));
- }
- catch (AMQInvalidArgumentException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQInvalidArgumentException
+ protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException
{
+ AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
+ AMQQueue queue = binding.getQueue();
+ FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
+
assert queue != null;
assert rKey != null;
@@ -114,8 +107,6 @@ public class TopicExchange extends Abstr
AMQShortString routingKey = TopicNormalizer.normalize(rKey);
- TopicBinding binding = new TopicBinding(rKey, queue, args);
-
if(_bindings.containsKey(binding))
{
FieldTable oldArgs = _bindings.get(binding);
@@ -146,6 +137,8 @@ public class TopicExchange extends Abstr
return;
}
}
+
+ result.addBinding(binding);
}
else
@@ -177,6 +170,8 @@ public class TopicExchange extends Abstr
result.addUnfilteredQueue(queue);
}
}
+
+ result.addBinding(binding);
_bindings.put(binding, args);
}
@@ -210,11 +205,19 @@ public class TopicExchange extends Abstr
? AMQShortString.EMPTY_STRING
: new AMQShortString(payload.getRoutingKey());
+ _logger.info("Message routing key: " + routingKey );
+
// The copy here is unfortunate, but not too bad relevant to the amount of
// things created and copied in getMatchedQueues
ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
queues.addAll(getMatchedQueues(payload, routingKey));
+ for(BaseQueue q : queues)
+ {
+ _logger.info("Matched Queue: " + q.getNameShortString() );
+ }
+
+
if(queues == null || queues.isEmpty())
{
_logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
@@ -226,7 +229,8 @@ public class TopicExchange extends Abstr
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
{
- TopicBinding binding = new TopicBinding(routingKey, queue, arguments);
+ Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
+
if (arguments == null)
{
return _bindings.containsKey(binding);
@@ -253,7 +257,7 @@ public class TopicExchange extends Abstr
public boolean isBound(AMQShortString routingKey)
{
- for(TopicBinding b : _bindings.keySet())
+ for(Binding b : _bindings.keySet())
{
if(b.getBindingKey().equals(routingKey))
{
@@ -266,7 +270,7 @@ public class TopicExchange extends Abstr
public boolean isBound(AMQQueue queue)
{
- for(TopicBinding b : _bindings.keySet())
+ for(Binding b : _bindings.keySet())
{
if(b.getQueue().equals(queue))
{
@@ -282,19 +286,16 @@ public class TopicExchange extends Abstr
return !_bindings.isEmpty();
}
-
- public void deregisterQueue(String rKey, AMQQueue queue, Map<String, Object> args)
- {
- removeBinding(new TopicBinding(new AMQShortString(rKey), queue, FieldTable.convertToFieldTable(args)));
- }
-
- private boolean removeBinding(final TopicBinding binding)
+ private boolean deregisterQueue(final Binding binding)
{
if(_bindings.containsKey(binding))
{
FieldTable bindingArgs = _bindings.remove(binding);
- AMQShortString bindingKey = TopicNormalizer.normalize(binding.getBindingKey());
+ AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+
+ result.removeBinding(binding);
+
if(argumentsContainSelector(bindingArgs))
{
try
@@ -341,8 +342,14 @@ public class TopicExchange extends Abstr
Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
+ TopicExchangeResult res = (TopicExchangeResult)result;
- queues = ((TopicExchangeResult)result).processMessage(message, queues);
+ for(Binding b : res.getBindings())
+ {
+ b.incrementMatches();
+ }
+
+ queues = res.processMessage(message, queues);
}
return queues;
}
@@ -350,14 +357,21 @@ public class TopicExchange extends Abstr
}
- protected void onBind(final org.apache.qpid.server.binding.Binding binding)
+ protected void onBind(final Binding binding)
{
- registerQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments());
+ try
+ {
+ registerQueue(binding);
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- protected void onUnbind(final org.apache.qpid.server.binding.Binding binding)
+ protected void onUnbind(final Binding binding)
{
- deregisterQueue(binding.getBindingKey(),binding.getQueue(),binding.getArguments());
+ deregisterQueue(binding);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Thu Mar 18 16:24:36 2010
@@ -21,14 +21,22 @@
package org.apache.qpid.server.exchange.topic;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
public final class TopicExchangeResult implements TopicMatcherResult
{
+ private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
@@ -64,6 +72,20 @@ public final class TopicExchangeResult i
return _unfilteredQueues.keySet();
}
+ public void addBinding(Binding binding)
+ {
+ _bindings.add(binding);
+ }
+
+ public void removeBinding(Binding binding)
+ {
+ _bindings.remove(binding);
+ }
+
+ public List<Binding> getBindings()
+ {
+ return new ArrayList<Binding>(_bindings);
+ }
public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
{
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Thu Mar 18 16:24:36 2010
@@ -28,6 +28,7 @@ import org.apache.qpid.server.registry.A
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
@@ -64,7 +65,7 @@ public class TopicExchangeTest extends T
public void testNoRoute() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null));
IncomingMessage message = createMessage("a.b");
@@ -76,7 +77,7 @@ public class TopicExchangeTest extends T
public void testDirectMatch() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("ab"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null));
IncomingMessage message = createMessage("a.b");
@@ -103,7 +104,7 @@ public class TopicExchangeTest extends T
public void testStarMatch() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.*"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.*", queue,_exchange, null));
IncomingMessage message = createMessage("a.b");
@@ -142,7 +143,7 @@ public class TopicExchangeTest extends T
public void testHashMatch() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.#"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.#", queue,_exchange, null));
IncomingMessage message = createMessage("a.b.c");
@@ -205,7 +206,7 @@ public class TopicExchangeTest extends T
public void testMidHash() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.*.#.b", queue,_exchange, null));
IncomingMessage message = createMessage("a.c.d.b");
@@ -235,7 +236,7 @@ public class TopicExchangeTest extends T
public void testMatchafterHash() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.*.#.b.c", queue,_exchange, null));
IncomingMessage message = createMessage("a.c.b.b");
@@ -281,7 +282,7 @@ public class TopicExchangeTest extends T
public void testHashAfterHash() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.*.#.b.c.#.d", queue,_exchange, null));
IncomingMessage message = createMessage("a.c.b.b.c");
@@ -308,7 +309,7 @@ public class TopicExchangeTest extends T
public void testHashHash() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.#.*.#.d", queue,_exchange, null));
IncomingMessage message = createMessage("a.c.b.b.c");
@@ -334,7 +335,7 @@ public class TopicExchangeTest extends T
public void testSubMatchFails() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.b.c.d", queue,_exchange, null));
IncomingMessage message = createMessage("a.b.c");
@@ -364,7 +365,7 @@ public class TopicExchangeTest extends T
public void testMoreRouting() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null));
IncomingMessage message = createMessage("a.b.c");
@@ -379,7 +380,7 @@ public class TopicExchangeTest extends T
public void testMoreQueue() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
- _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+ _exchange.registerQueue(new Binding(null,"a.b", queue,_exchange, null));
IncomingMessage message = createMessage("a");
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=924881&r1=924880&r2=924881&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Thu Mar 18 16:24:36 2010
@@ -334,6 +334,7 @@ public class DurableSubscriptionTest ext
{
_logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull(msg);
assertEquals("B", ((TextMessage) msg).getText());
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org