You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/11 17:22:05 UTC
svn commit: r655323 [1/4] - in
/incubator/qpid/branches/broker-queue-refactor/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main...
Author: rgodfrey
Date: Sun May 11 08:22:03 2008
New Revision: 655323
URL: http://svn.apache.org/viewvc?rev=655323&view=rev
Log:
Updates on the refactoring work
Added:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
- copied, changed from r650226, incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
Removed:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueImpl.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryAgent.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
incubator/qpid/branches/broker-queue-refactor/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/Job.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java
incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Sun May 11 08:22:03 2008
@@ -176,7 +176,8 @@
ownerShortString = new AMQShortString(owner);
}
- queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
+ queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost(),
+ null);
if (queue.isDurable() && !queue.isAutoDelete())
{
_messageStore.createQueue(queue);
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Sun May 11 08:22:03 2008
@@ -44,6 +44,8 @@
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -435,10 +437,8 @@
}
}
- synchronized (_unacknowledgedMessageMap.getLock())
- {
- _unacknowledgedMessageMap.add(deliveryTag, entry);
- }
+ _unacknowledgedMessageMap.add(deliveryTag, entry);
+
}
private final String id = "(" + System.identityHashCode(this) + ")";
@@ -807,22 +807,7 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- synchronized (_unacknowledgedMessageMap.getLock())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
- }
-
- _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
- }
-
- }
-
+ _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
}
/**
@@ -952,4 +937,33 @@
{
return _messageStore;
}
+
+ private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
+ {
+
+ public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+ }
+ };
+
+ public ClientDeliveryMethod getClientDeliveryMethod()
+ {
+ return _clientDeliveryMethod;
+ }
+
+ private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ {
+ addUnacknowledgedMessage(entry, deliveryTag, sub);
+ }
+ };
+
+ public RecordDeliveryMethod getRecordDeliveryMethod()
+ {
+ return _recordDeliveryMethod;
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Sun May 11 08:22:03 2008
@@ -45,8 +45,6 @@
void visit(Visitor visitor) throws AMQException;
- Object getLock();
-
void add(long deliveryTag, QueueEntry message);
void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Sun May 11 08:22:03 2008
@@ -113,11 +113,6 @@
}
}
- public Object getLock()
- {
- return _lock;
- }
-
public void add(long deliveryTag, QueueEntry message)
{
synchronized (_lock)
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Sun May 11 08:22:03 2008
@@ -30,6 +30,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -177,11 +178,22 @@
boolean durable = queueConfiguration.getBoolean("durable" ,false);
boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
String owner = queueConfiguration.getString("owner", null);
+ FieldTable arguments = null;
+ Integer priorities = queueConfiguration.getInteger("priorities", null);
+ if(priorities != null && priorities.intValue() > 1)
+ {
+ if(arguments == null)
+ {
+ arguments = new FieldTable();
+ }
+ arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
+ }
+
queue = AMQQueueFactory.createAMQQueueImpl(queueName,
durable,
owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
- autodelete /* Therefore autodelete makes no sence */, virtualHost);
+ autodelete /* Therefore autodelete makes no sence */, virtualHost, arguments);
if (queue.isDurable())
{
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Sun May 11 08:22:03 2008
@@ -38,7 +38,6 @@
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -192,9 +191,7 @@
{
_exchangeMbean.unregister();
}
- }
-
- abstract public Map<AMQShortString, List<AMQQueue>> getBindings();
+ }
public String toString()
{
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Sun May 11 08:22:03 2008
@@ -93,6 +93,6 @@
*/
boolean hasBindings();
- Map<AMQShortString, List<AMQQueue>> getBindings();
+
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Sun May 11 08:22:03 2008
@@ -31,7 +31,6 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQQueueImpl;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Sun May 11 08:22:03 2008
@@ -22,6 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -31,7 +32,12 @@
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.topic.TopicParser;
+import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -43,6 +49,9 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.ref.WeakReference;
public class TopicExchange extends AbstractExchange
{
@@ -80,22 +89,204 @@
private static final Logger _logger = Logger.getLogger(TopicExchange.class);
+/*
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+*/
// private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final byte TOPIC_SEPARATOR = (byte)'.';
private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
- private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized =
- new ConcurrentHashMap<AMQShortString, AMQShortString[]>();
+
private static final byte HASH_BYTE = (byte)'#';
private static final byte STAR_BYTE = (byte)'*';
+ private final TopicParser _parser = new TopicParser();
+
+ private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
+ new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
+
+ private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
+
+ private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
+
+ public static class Binding
+ {
+ private final AMQShortString _bindingKey;
+ private final AMQQueue _queue;
+
+ public Binding(AMQShortString bindingKey, AMQQueue queue)
+ {
+ _bindingKey = bindingKey;
+ _queue = queue;
+ }
+
+ public AMQShortString getBindingKey()
+ {
+ return _bindingKey;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public int hashCode()
+ {
+ return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 + _queue.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ if(this == o)
+ {
+ return true;
+ }
+ if(o instanceof Binding)
+ {
+ Binding other = (Binding) o;
+ return (_queue == other._queue)
+ && ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey));
+ }
+ return false;
+ }
+ }
+
+
+
+ private final class TopicExchangeResult implements TopicMatcherResult
+ {
+ private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
+ private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
+
+ public void addUnfilteredQueue(AMQQueue queue)
+ {
+ Integer instances = _unfilteredQueues.get(queue);
+ if(instances == null)
+ {
+ _unfilteredQueues.put(queue, 1);
+ }
+ else
+ {
+ _unfilteredQueues.put(queue, instances + 1);
+ }
+ }
+
+ public void removeUnfilteredQueue(AMQQueue queue)
+ {
+ Integer instances = _unfilteredQueues.get(queue);
+ if(instances == 1)
+ {
+ _unfilteredQueues.remove(queue);
+ }
+ else
+ {
+ _unfilteredQueues.put(queue,instances - 1);
+ }
+
+ }
+
+
+ public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ {
+ Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ if(filters == null)
+ {
+ filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
+ _filteredQueues.put(queue, filters);
+ }
+ Integer instances = filters.get(filter);
+ if(instances == null)
+ {
+ filters.put(filter,1);
+ }
+ else
+ {
+ filters.put(filter, instances + 1);
+ }
+
+ }
+
+ public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ {
+ Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ if(filters != null)
+ {
+ Integer instances = filters.get(filter);
+ if(instances == 1)
+ {
+ filters.remove(filter);
+ if(filters.isEmpty())
+ {
+ _filteredQueues.remove(queue);
+ }
+ }
+ else if(instances != null)
+ {
+ filters.put(filter, instances - 1);
+ }
+
+ }
+
+ }
+
+ public void replaceQueueFilter(AMQQueue queue,
+ MessageFilter<RuntimeException> oldFilter,
+ MessageFilter<RuntimeException> newFilter)
+ {
+ Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
+ Integer oldFilterInstances = filters.get(oldFilter);
+ if(oldFilterInstances == 1)
+ {
+ newFilters.remove(oldFilter);
+ }
+ else
+ {
+ newFilters.put(oldFilter, oldFilterInstances-1);
+ }
+ Integer newFilterInstances = filters.get(newFilter);
+ if(newFilterInstances == null)
+ {
+ newFilters.put(newFilter, 1);
+ }
+ else
+ {
+ newFilters.put(newFilter, newFilterInstances+1);
+ }
+ _filteredQueues.put(queue,newFilters);
+ }
+
+ public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues)
+ {
+ queues.addAll(_unfilteredQueues.keySet());
+ if(!_filteredQueues.isEmpty())
+ {
+ for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
+ {
+ if(!queues.contains(entry.getKey()))
+ {
+ for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
+ {
+ if(filter.matches(msg))
+ {
+ queues.add(entry.getKey());
+ }
+ }
+ }
+ }
+ }
+ return queues;
+ }
+
+ }
+
+
/** TopicExchangeMBean class implements the management interface for the Topic exchanges. */
@MBeanDescription("Management Bean for Topic Exchange")
private final class TopicExchangeMBean extends ExchangeMBean
@@ -112,20 +303,24 @@
public TabularData bindings() throws OpenDataException
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
- for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet())
+ Map<String, List<String>> bindingData = new HashMap<String, List<String>>();
+ for (Binding binding : _bindings.keySet())
{
- AMQShortString key = entry.getKey();
- List<String> queueList = new ArrayList<String>();
-
- List<AMQQueue> queues = getMatchedQueues(key);
- for (AMQQueue q : queues)
+ String key = binding.getBindingKey().toString();
+ List<String> queueNames = bindingData.get(key);
+ if(queueNames == null)
{
- queueList.add(q.getName().toString());
+ queueNames = new ArrayList<String>();
+ bindingData.put(key, queueNames);
}
+ queueNames.add(binding.getQueue().getName().toString());
- Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[queueList.size()])};
- CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
- _bindingList.put(bindingData);
+ }
+ for(Map.Entry<String, List<String>> entry : bindingData.entrySet())
+ {
+ Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]) };
+ CompositeData bindingCompositeData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
+ _bindingList.put(bindingCompositeData);
}
return _bindingList;
@@ -163,73 +358,106 @@
_logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
- // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
- List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
-
-
+ AMQShortString routingKey;
-
-
-
- // if we got null back, no previous value was associated with the specified routing key hence
- // we need to read back the new value just put into the map
- if (queueList == null)
+ if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
{
- queueList = _bindingKey2queues.get(rKey);
+ routingKey = normalize(rKey);
+ }
+ else
+ {
+ routingKey = rKey;
}
+ Binding binding = new Binding(rKey, queue);
-
- if (!queueList.contains(queue))
+ if(_bindings.containsKey(binding))
{
- queueList.add(queue);
+ FieldTable oldArgs = _bindings.get(binding);
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
-
- if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ if(argumentsContainSelector(args))
{
- AMQShortString routingKey = normalize(rKey);
- List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
-
- if(queueList2 == null)
+ if(argumentsContainSelector(oldArgs))
{
- queueList2 = _wildCardBindingKey2queues.get(routingKey);
- AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR);
-
- ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens());
-
- while (keyTok.hasMoreTokens())
- {
- keyTokList.add(keyTok.nextToken());
- }
-
- _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()]));
+ result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args));
+ }
+ else
+ {
+ result.addFilteredQueue(queue,createSelectorFilter(args));
+ result.removeUnfilteredQueue(queue);
}
- queueList2.add(queue);
-
}
else
{
- List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
- if(queueList2 == null)
+ if(argumentsContainSelector(oldArgs))
+ {
+ result.addUnfilteredQueue(queue);
+ result.removeFilteredQueue(queue, createSelectorFilter(oldArgs));
+ }
+ else
{
- queueList2 = _simpleBindingKey2queues.get(rKey);
+ // TODO - fix control flow
+ return;
}
- queueList2.add(queue);
+ }
+
+ }
+ else
+ {
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
+ if(result == null)
+ {
+ result = new TopicExchangeResult();
+ if(argumentsContainSelector(args))
+ {
+ result.addFilteredQueue(queue, createSelectorFilter(args));
+ }
+ else
+ {
+ result.addUnfilteredQueue(queue);
+ }
+ _parser.addBinding(routingKey, result);
+ _topicExchangeResults.put(routingKey,result);
}
+ else
+ {
+ if(argumentsContainSelector(args))
+ {
+ result.addFilteredQueue(queue, createSelectorFilter(args));
+ }
+ else
+ {
+ result.addUnfilteredQueue(queue);
+ }
+ }
+ _bindings.put(binding, args);
+ }
+ }
+ private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
+ throws AMQException
+ {
- }
- else if (_logger.isDebugEnabled())
+ final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
+ JMSSelectorFilter selector = null;
+
+ if(selectorRef == null || (selector = selectorRef.get())==null)
{
- _logger.debug("Queue " + queue + " is already registered with routing key " + rKey);
+ selector = new JMSSelectorFilter<RuntimeException>(selectorString);
+ _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
}
+ return selector;
+ }
-
-
+ private static boolean argumentsContainSelector(final FieldTable args)
+ {
+ return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0;
}
private AMQShortString normalize(AMQShortString routingKey)
@@ -279,16 +507,6 @@
AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
-/*
- StringBuilder sb = new StringBuilder();
- for (AMQShortString s : subscriptionList)
- {
- sb.append(s);
- sb.append(TOPIC_SEPARATOR);
- }
-
- sb.deleteCharAt(sb.length() - 1);
-*/
return normalizedString;
}
@@ -298,11 +516,11 @@
final AMQShortString routingKey = payload.getRoutingKey();
- List<AMQQueue> queues = getMatchedQueues(routingKey);
+ Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey);
if(queues == null || queues.isEmpty())
{
- _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes - " + _bindingKey2queues);
+ _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
}
payload.enqueue(queues);
@@ -316,23 +534,29 @@
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
+ Binding binding = new Binding(routingKey, queue);
- return (queues != null) && queues.contains(queue);
+ return _bindings.containsKey(binding);
}
public boolean isBound(AMQShortString routingKey)
{
- List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
+ for(Binding b : _bindings.keySet())
+ {
+ if(b.getBindingKey().equals(routingKey))
+ {
+ return true;
+ }
+ }
- return (queues != null) && !queues.isEmpty();
+ return false;
}
public boolean isBound(AMQQueue queue)
{
- for (List<AMQQueue> queues : _bindingKey2queues.values())
+ for(Binding b : _bindings.keySet())
{
- if (queues.contains(queue))
+ if(b.getQueue().equals(queue))
{
return true;
}
@@ -343,7 +567,7 @@
public boolean hasBindings()
{
- return !_bindingKey2queues.isEmpty();
+ return !_bindings.isEmpty();
}
public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
@@ -351,52 +575,27 @@
assert queue != null;
assert rKey != null;
- List<AMQQueue> queues = _bindingKey2queues.get(rKey);
- if (queues == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + rKey + ". No queue was registered with that _routing key");
+ Binding binding = new Binding(rKey, queue);
- }
- boolean removedQ = queues.remove(queue);
- if (!removedQ)
+ if (!_bindings.containsKey(binding))
{
- throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + rKey);
+ throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue.getName() + " was not registered with exchange " + this.getName()
+ + " with routing key " + rKey + ".");
}
-
- if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ FieldTable bindingArgs = _bindings.remove(binding);
+ AMQShortString bindingKey = normalize(rKey);
+ TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+ if(argumentsContainSelector(bindingArgs))
{
- AMQShortString bindingKey = normalize(rKey);
- List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey);
- queues2.remove(queue);
- if(queues2.isEmpty())
- {
- _wildCardBindingKey2queues.remove(bindingKey);
- _bindingKey2Tokenized.remove(bindingKey);
- }
-
+ result.removeFilteredQueue(queue, createSelectorFilter(bindingArgs));
}
else
{
- List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey);
- queues2.remove(queue);
- if(queues2.isEmpty())
- {
- _simpleBindingKey2queues.remove(rKey);
- }
-
+ result.removeUnfilteredQueue(queue);
}
-
-
-
- if (queues.isEmpty())
- {
- _bindingKey2queues.remove(rKey);
- }
}
protected ExchangeMBean createMBean() throws AMQException
@@ -412,172 +611,25 @@
}
}
- public Map<AMQShortString, List<AMQQueue>> getBindings()
+ private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
{
- return _bindingKey2queues;
- }
-
- private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
- {
-
- List<AMQQueue> list = null;
- if(!_wildCardBindingKey2queues.isEmpty())
+ Collection<TopicMatcherResult> results = _parser.parse(routingKey);
+ if(results.isEmpty())
{
-
-
- AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
-
- final int routingTokensCount = routingTokens.countTokens();
-
-
- AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount];
-
- if(routingTokensCount == 1)
- {
- routingkeyTokens[0] =routingKey;
- }
- else
- {
-
-
- int token = 0;
- while (routingTokens.hasMoreTokens())
- {
-
- AMQShortString next = routingTokens.nextToken();
- /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH))
- {
- continue;
- }
- */
-
- routingkeyTokens[token++] = next;
- }
- }
-
- _logger.info("Routing key tokens: " + Arrays.asList(routingkeyTokens));
-
- for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet())
- {
-
- AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey);
-
-
- boolean matching = true;
- boolean done = false;
-
- int depthPlusRoutingSkip = 0;
- int depthPlusQueueSkip = 0;
-
- final int bindingKeyTokensCount = bindingKeyTokens.length;
-
- while (matching && !done)
- {
-
- if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip))
- {
- done = true;
-
- // if it was the routing key that ran out of digits
- if (routingTokensCount == depthPlusRoutingSkip)
- {
- if (bindingKeyTokensCount > depthPlusQueueSkip)
- { // a hash and it is the last entry
- matching =
- bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)
- && (bindingKeyTokensCount == (depthPlusQueueSkip + 1));
- }
- }
- else if (routingTokensCount > depthPlusRoutingSkip)
- {
- // There is still more routing key to check
- matching = false;
- }
-
- continue;
- }
-
- // if the values on the two topics don't match
- if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip]))
- {
- if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN))
- {
- depthPlusQueueSkip++;
- depthPlusRoutingSkip++;
-
- continue;
- }
- else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN))
- {
- // Is this a # at the end
- if (bindingKeyTokensCount == (depthPlusQueueSkip + 1))
- {
- done = true;
-
- continue;
- }
-
- // otherwise # in the middle
- while (routingTokensCount > depthPlusRoutingSkip)
- {
- if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1]))
- {
- depthPlusQueueSkip += 2;
- depthPlusRoutingSkip++;
-
- break;
- }
-
- depthPlusRoutingSkip++;
- }
-
- continue;
- }
-
- matching = false;
- }
-
- depthPlusQueueSkip++;
- depthPlusRoutingSkip++;
- }
-
- if (matching)
- {
- if(list == null)
- {
- list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey));
- }
- else
- {
- list.addAll(_wildCardBindingKey2queues.get(bindingKey));
- }
- }
- }
-
+ return Collections.EMPTY_SET;
}
- if(!_simpleBindingKey2queues.isEmpty())
+ else
{
- List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey);
- if(list == null)
- {
- if(queues == null)
- {
- list = Collections.EMPTY_LIST;
- }
- else
- {
- list = new ArrayList<AMQQueue>(queues);
- }
- }
- else if(queues != null)
+ Set<AMQQueue> queues = new HashSet<AMQQueue>();
+ for(TopicMatcherResult result : results)
{
- list.addAll(queues);
- }
+ ((TopicExchangeResult)result).processMessage(message, queues);
+ }
+ return queues;
}
- return list;
}
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java Sun May 11 08:22:03 2008
@@ -4,6 +4,7 @@
import org.apache.qpid.framing.AMQShortStringTokenizer;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
/*
*
@@ -27,6 +28,10 @@
*/
public class TopicMatcherDFAState
{
+ private static final AtomicInteger stateId = new AtomicInteger();
+
+ private final int _id = stateId.incrementAndGet();
+
private final Collection<TopicMatcherResult> _results;
private final Map<TopicWord, TopicMatcherDFAState> _nextStateMap;
private static final byte TOPIC_DELIMITTER = (byte)'.';
@@ -233,4 +238,58 @@
}
+ public String toString()
+ {
+ StringBuilder transitions = new StringBuilder();
+ for(Map.Entry<TopicWord, TopicMatcherDFAState> entry : _nextStateMap.entrySet())
+ {
+ transitions.append("[ ");
+ transitions.append(entry.getKey());
+ transitions.append("\t ->\t ");
+ transitions.append(entry.getValue()._id);
+ transitions.append(" ]\n");
+ }
+
+
+ return "[ State " + _id + " ]\n" + transitions + "\n";
+
+ }
+
+ public String reachableStates()
+ {
+ StringBuilder result = new StringBuilder("Start state: " + _id + "\n");
+
+ SortedSet<TopicMatcherDFAState> reachableStates =
+ new TreeSet<TopicMatcherDFAState>(new Comparator<TopicMatcherDFAState>()
+ {
+ public int compare(final TopicMatcherDFAState o1, final TopicMatcherDFAState o2)
+ {
+ return o1._id - o2._id;
+ }
+ });
+ reachableStates.add(this);
+
+ int count;
+
+ do
+ {
+ count = reachableStates.size();
+ Collection<TopicMatcherDFAState> originalStates = new ArrayList<TopicMatcherDFAState>(reachableStates);
+ for(TopicMatcherDFAState state : originalStates)
+ {
+ reachableStates.addAll(state._nextStateMap.values());
+ }
+ }
+ while(reachableStates.size() != count);
+
+
+
+ for(TopicMatcherDFAState state : reachableStates)
+ {
+ result.append(state.toString());
+ }
+
+ return result.toString();
+ }
+
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java Sun May 11 08:22:03 2008
@@ -20,6 +20,6 @@
* under the License.
*
*/
-public class TopicMatcherResult
+public interface TopicMatcherResult
{
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java Sun May 11 08:22:03 2008
@@ -4,6 +4,8 @@
import org.apache.qpid.framing.AMQShortStringTokenizer;
import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.io.IOException;
/*
*
@@ -30,6 +32,7 @@
private static final byte TOPIC_DELIMITER = (byte)'.';
private final TopicWordDictionary _dictionary = new TopicWordDictionary();
+ private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
private static class Position
{
@@ -37,6 +40,7 @@
private final boolean _selfTransition;
private final int _position;
private final boolean _endState;
+ private boolean _followedByAnyLoop;
public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
@@ -59,6 +63,43 @@
}
+ public void addBinding(AMQShortString bindingKey, TopicMatcherResult result)
+ {
+
+ TopicMatcherDFAState startingStateMachine;
+ TopicMatcherDFAState newStateMachine;
+
+ do
+ {
+ startingStateMachine = _stateMachine.get();
+ if(startingStateMachine == null)
+ {
+ newStateMachine = createStateMachine(bindingKey, result);
+ }
+ else
+ {
+ newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result));
+ }
+
+ }
+ while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine));
+
+ }
+
+ public Collection<TopicMatcherResult> parse(AMQShortString routingKey)
+ {
+ TopicMatcherDFAState stateMachine = _stateMachine.get();
+ if(stateMachine == null)
+ {
+ return Collections.EMPTY_SET;
+ }
+ else
+ {
+ return stateMachine.parse(_dictionary,routingKey);
+ }
+ }
+
+
TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result)
{
List<TopicWord> wordList = createTopicWordList(bindingKey);
@@ -108,7 +149,6 @@
}
-
int pos = 0;
int wordPos = 0;
@@ -131,6 +171,31 @@
}
+
+ for(int p = 0; p<positionCount; p++)
+ {
+ boolean followedByWildcards = true;
+
+ int n = p;
+ while(followedByWildcards && n<(positionCount+1))
+ {
+
+ if(positions[n]._selfTransition)
+ {
+ break;
+ }
+ else if(positions[n]._word!=TopicWord.ANY_WORD)
+ {
+ followedByWildcards = false;
+ }
+ n++;
+ }
+
+
+ positions[p]._followedByAnyLoop = followedByWildcards && (n!= positionCount+1);
+ }
+
+
// from each position you transition to a set of other positions.
// we approach this by examining steps of increasing length - so we
// look how far we can go from the start position in 1 word, 2 words, etc...
@@ -258,6 +323,32 @@
{
dest.setValue(Collections.singleton(loopingTerminal));
}
+ else
+ {
+ Position anyLoop = null;
+ for(Position destPos : dest.getValue())
+ {
+ if(destPos._followedByAnyLoop)
+ {
+ if(anyLoop == null || anyLoop._position<destPos._position)
+ {
+ anyLoop = destPos;
+ }
+ }
+ }
+ if(anyLoop != null)
+ {
+ Collection<Position> removals = new ArrayList<Position>();
+ for(Position destPos : dest.getValue())
+ {
+ if(destPos._position < anyLoop._position)
+ {
+ removals.add(destPos);
+ }
+ }
+ dest.getValue().removeAll(removals);
+ }
+ }
SimpleState stateForEntry = stateMap.get(dest.getValue());
if(stateForEntry == null)
@@ -332,8 +423,65 @@
public static void main(String[] args)
{
+
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+ printMatches(new String[]{
+ "#.a.#",
+ "#.b.#",
+ "#.c.#",
+ "#.d.#",
+ "#.e.#",
+ "#.f.#",
+ "#.g.#",
+ "#.h.#",
+ "#.i.#",
+ "#.j.#",
+ "#.k.#",
+ "#.l.#",
+ "#.m.#",
+ "#.n.#",
+ "#.o.#",
+ "#.p.#",
+ "#.q.#"
+
+ }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+/*
+ printMatches(new String[]{
+ "#.a.#",
+ "#.b.#",
+ "#.c.#",
+ "#.d.#",
+ "#.e.#",
+ "#.f.#",
+ "#.g.#",
+ "#.h.#",
+ "#.i.#",
+ "#.j.#",
+ "#.k.#",
+ "#.l.#",
+ "#.m.#",
+ "#.n.#",
+ "#.o.#",
+ "#.p.#",
+ "#.q.#",
+ "#.r.#",
+ "#.s.#",
+ "#.t.#",
+ "#.u.#",
+ "#.v.#",
+ "#.w.#",
+ "#.x.#",
+ "#.y.#",
+ "#.z.#"
+
+
+ },"a.b");
+
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c");
+*/
printMatches("","");
printMatches("a","a");
@@ -394,12 +542,19 @@
TopicParser parser = new TopicParser();
+ long start = System.currentTimeMillis();
for(int i = 0; i < bindingKeys.length; i++)
{
- TopicMatcherResult r = new TopicMatcherResult();
+ System.out.println((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]);
+ TopicMatcherResult r = new TopicMatcherResult(){};
resultMap.put(r, bindingKeys[i]);
AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]);
+ System.err.println("=====================================================");
+ System.err.println("Adding binding key: " + bindingKeyShortString);
+ System.err.println("-----------------------------------------------------");
+
+
if(i==0)
{
sm = parser.createStateMachine(bindingKeyShortString, r);
@@ -408,6 +563,16 @@
{
sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r));
}
+ System.err.println(sm.reachableStates());
+ System.err.println("=====================================================");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
AMQShortString routingKeyShortString = new AMQShortString(routingKey);
@@ -438,7 +603,7 @@
AMQShortString routingKeyShortString = new AMQShortString(routingKey);
TopicParser parser = new TopicParser();
- final TopicMatcherResult result = new TopicMatcherResult();
+ final TopicMatcherResult result = new TopicMatcherResult(){};
TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result);
return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty();
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java Sun May 11 08:22:03 2008
@@ -28,7 +28,27 @@
*/
public final class TopicWord
{
- public static final TopicWord ANY_WORD = new TopicWord();
- public static final TopicWord WILDCARD_WORD = new TopicWord();
+ public static final TopicWord ANY_WORD = new TopicWord("*");
+ public static final TopicWord WILDCARD_WORD = new TopicWord("#");
+ private String _word;
+ public TopicWord()
+ {
+
+ }
+
+ public TopicWord(String s)
+ {
+ _word = s;
+ }
+
+ public TopicWord(final AMQShortString name)
+ {
+ _word = name.toString();
+ }
+
+ public String toString()
+ {
+ return _word;
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java Sun May 11 08:22:03 2008
@@ -42,7 +42,7 @@
public TopicWord getOrCreateWord(AMQShortString name)
{
- TopicWord word = _dictionary.putIfAbsent(name, new TopicWord());
+ TopicWord word = _dictionary.putIfAbsent(name, new TopicWord(name));
if(word == null)
{
word = _dictionary.get(name);
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java Sun May 11 08:22:03 2008
@@ -21,12 +21,12 @@
//
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* An expression which performs an operation on two expression values
*/
-public abstract class ArithmeticExpression extends BinaryExpression
+public abstract class ArithmeticExpression<E extends Exception> extends BinaryExpression<E>
{
protected static final int INTEGER = 1;
@@ -248,7 +248,7 @@
}
}
- public Object evaluate(AMQMessage message) throws AMQException
+ public Object evaluate(Filterable<E> message) throws E
{
Object lvalue = left.evaluate(message);
if (lvalue == null)
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java Sun May 11 08:22:03 2008
@@ -23,23 +23,23 @@
/**
* An expression which performs an operation on two expression values.
*/
-public abstract class BinaryExpression implements Expression
+public abstract class BinaryExpression<E extends Exception> implements Expression<E>
{
- protected Expression left;
- protected Expression right;
+ protected Expression<E> left;
+ protected Expression<E> right;
- public BinaryExpression(Expression left, Expression right)
+ public BinaryExpression(Expression<E> left, Expression<E> right)
{
this.left = left;
this.right = right;
}
- public Expression getLeft()
+ public Expression<E> getLeft()
{
return left;
}
- public Expression getRight()
+ public Expression<E> getRight()
{
return right;
}
@@ -90,7 +90,7 @@
/**
* @param expression
*/
- public void setRight(Expression expression)
+ public void setRight(Expression<E> expression)
{
right = expression;
}
@@ -98,7 +98,7 @@
/**
* @param expression
*/
- public void setLeft(Expression expression)
+ public void setLeft(Expression<E> expression)
{
left = expression;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java Sun May 11 08:22:03 2008
@@ -22,19 +22,20 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* A BooleanExpression is an expression that always
* produces a Boolean result.
*/
-public interface BooleanExpression extends Expression
+public interface BooleanExpression<E extends Exception> extends Expression<E>
{
/**
* @param message
* @return true if the expression evaluates to Boolean.TRUE.
- * @throws AMQException
+ * @throws E
*/
- public boolean matches(AMQMessage message) throws AMQException;
+ public boolean matches(Filterable<E> message) throws E;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java Sun May 11 08:22:03 2008
@@ -29,19 +29,20 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* A filter performing a comparison of two objects
*/
-public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression
+public abstract class ComparisonExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
{
- public static BooleanExpression createBetween(Expression value, Expression left, Expression right)
+ public static<E extends Exception> BooleanExpression<E> createBetween(Expression<E> value, Expression left, Expression<E> right)
{
return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
}
- public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right)
+ public static<E extends Exception> BooleanExpression<E> createNotBetween(Expression<E> value, Expression<E> left, Expression<E> right)
{
return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
}
@@ -72,7 +73,7 @@
REGEXP_CONTROL_CHARS.add(new Character('!'));
}
- static class LikeExpression extends UnaryExpression implements BooleanExpression
+ static class LikeExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
{
Pattern likePattern;
@@ -80,7 +81,7 @@
/**
* @param right
*/
- public LikeExpression(Expression right, String like, int escape)
+ public LikeExpression(Expression<E> right, String like, int escape)
{
super(right);
@@ -137,7 +138,7 @@
/**
* org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
*/
- public Object evaluate(AMQMessage message) throws AMQException
+ public Object evaluate(Filterable<E> message) throws E
{
Object rv = this.getRight().evaluate(message);
@@ -157,7 +158,7 @@
return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
}
- public boolean matches(AMQMessage message) throws AMQException
+ public boolean matches(Filterable<E> message) throws E
{
Object object = evaluate(message);
@@ -235,45 +236,9 @@
return doCreateEqual(left, right);
}
- private static BooleanExpression doCreateEqual(Expression left, Expression right)
+ private static<E extends Exception> BooleanExpression<E> doCreateEqual(Expression<E> left, Expression<E> right)
{
- return new ComparisonExpression(left, right)
- {
-
- public Object evaluate(AMQMessage message) throws AMQException
- {
- Object lv = left.evaluate(message);
- Object rv = right.evaluate(message);
-
- // Iff one of the values is null
- if ((lv == null) ^ (rv == null))
- {
- return Boolean.FALSE;
- }
-
- if ((lv == rv) || lv.equals(rv))
- {
- return Boolean.TRUE;
- }
-
- if ((lv instanceof Comparable) && (rv instanceof Comparable))
- {
- return compare((Comparable) lv, (Comparable) rv);
- }
-
- return Boolean.FALSE;
- }
-
- protected boolean asBoolean(int answer)
- {
- return answer == 0;
- }
-
- public String getExpressionSymbol()
- {
- return "=";
- }
- };
+ return new EqualExpression(left, right);
}
public static BooleanExpression createGreaterThan(final Expression left, final Expression right)
@@ -423,7 +388,7 @@
super(left, right);
}
- public Object evaluate(AMQMessage message) throws AMQException
+ public Object evaluate(Filterable<E> message) throws E
{
Comparable lv = (Comparable) left.evaluate(message);
if (lv == null)
@@ -585,11 +550,52 @@
protected abstract boolean asBoolean(int answer);
- public boolean matches(AMQMessage message) throws AMQException
+ public boolean matches(Filterable<E> message) throws E
{
Object object = evaluate(message);
return (object != null) && (object == Boolean.TRUE);
}
+ private static class EqualExpression<E extends Exception> extends ComparisonExpression<E>
+ {
+ public EqualExpression(final Expression<E> left, final Expression<E> right)
+ {
+ super(left, right);
+ }
+
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ Object lv = left.evaluate(message);
+ Object rv = right.evaluate(message);
+
+ // Iff one of the values is null
+ if ((lv == null) ^ (rv == null))
+ {
+ return Boolean.FALSE;
+ }
+
+ if ((lv == rv) || lv.equals(rv))
+ {
+ return Boolean.TRUE;
+ }
+
+ if ((lv instanceof Comparable) && (rv instanceof Comparable))
+ {
+ return compare((Comparable) lv, (Comparable) rv);
+ }
+
+ return Boolean.FALSE;
+ }
+
+ protected boolean asBoolean(int answer)
+ {
+ return answer == 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "=";
+ }
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java Sun May 11 08:22:03 2008
@@ -27,21 +27,22 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* Represents a constant expression
*/
-public class ConstantExpression implements Expression
+public class ConstantExpression<E extends Exception> implements Expression<E>
{
- static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+ static class BooleanConstantExpression<E extends Exception> extends ConstantExpression<E> implements BooleanExpression<E>
{
public BooleanConstantExpression(Object value)
{
super(value);
}
- public boolean matches(AMQMessage message) throws AMQException
+ public boolean matches(Filterable<E> message) throws E
{
Object object = evaluate(message);
@@ -120,7 +121,7 @@
this.value = value;
}
- public Object evaluate(AMQMessage message) throws AMQException
+ public Object evaluate(Filterable<E> message) throws E
{
return value;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java Sun May 11 08:22:03 2008
@@ -22,16 +22,17 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* Represents an expression
*/
-public interface Expression
+public interface Expression<E extends Exception>
{
/**
* @return the value of this expression
*/
- public Object evaluate(AMQMessage message) throws AMQException;
+ public Object evaluate(Filterable<E> message) throws E;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java Sun May 11 08:22:03 2008
@@ -24,14 +24,16 @@
//
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.AMQException;
-public interface FilterManager
+public interface FilterManager<E extends Exception>
{
- void add(MessageFilter filter);
+ void add(MessageFilter<E> filter);
- void remove(MessageFilter filter);
+ void remove(MessageFilter<E> filter);
- boolean allAllow(AMQMessage msg);
+ boolean allAllow(Filterable<E> msg);
boolean hasFilters();
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Sun May 11 08:22:03 2008
@@ -39,7 +39,7 @@
if (filters != null)
{
- manager = new SimpleFilterManager();
+
if(filters.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()))
{
@@ -47,23 +47,13 @@
if (selector != null && !selector.equals(""))
{
+ manager = new SimpleFilterManager();
manager.add(new JMSSelectorFilter(selector));
}
}
- if (filters.containsKey(AMQPFilterTypes.NO_CONSUME.getValue()))
- {
- manager.add(new NoConsumerFilter());
- }
-
-
- //If we added no filters don't bear the overhead of having an filter manager
- if (!manager.hasFilters())
- {
- manager = null;
- }
}
else
{
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Sun May 11 08:22:03 2008
@@ -23,42 +23,30 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.jms.selector.SelectorParser;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
-
-public class JMSSelectorFilter implements MessageFilter
+public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E>
{
private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
private String _selector;
- private BooleanExpression _matcher;
+ private BooleanExpression<E> _matcher;
public JMSSelectorFilter(String selector) throws AMQException
{
_selector = selector;
- _logger.info("Created JMSSelectorFilter with selector:" + _selector);
-
-
_matcher = new SelectorParser().parse(selector);
-
-
}
- public boolean matches(AMQMessage message)
+ public boolean matches(Filterable<E> message) throws E
{
- try
- {
- boolean match = _matcher.matches(message);
- _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
- return match;
- }
- catch (AMQException e)
+ boolean match = _matcher.matches(message);
+ if(_logger.isDebugEnabled())
{
- //fixme this needs to be sorted.. it shouldn't happen
- e.printStackTrace();
+ _logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
}
- return false;
+ return match;
}
public String getSelector()