You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/07/20 21:05:08 UTC
svn commit: r795958 [1/3] - in /qpid/branches/java-broker-0-10/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apache/q...
Author: rgodfrey
Date: Mon Jul 20 19:05:05 2009
New Revision: 795958
URL: http://svn.apache.org/viewvc?rev=795958&view=rev
Log:
Java Broker 0-10 Exploratory work
Added:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Mon Jul 20 19:05:05 2009
@@ -20,10 +20,7 @@
*/
package org.apache.qpid.extras.exchanges.diagnostic;
-import java.util.List;
-import java.util.Map;
import java.util.ArrayList;
-import java.util.Collection;
import javax.management.JMException;
import javax.management.openmbean.OpenDataException;
@@ -34,8 +31,8 @@
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.junit.extensions.util.SizeOf;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
@@ -193,20 +190,20 @@
return false;
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
Long value = new Long(SizeOf.getUsedMemory());
AMQShortString key = new AMQShortString("memory");
- FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders();
+ FieldTable headers = ((BasicContentHeaderProperties)payload.getMessageHeader().properties).getHeaders();
headers.put(key, value);
- ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
+ ((BasicContentHeaderProperties)payload.getMessageHeader().properties).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
queues.add(q);
- payload.enqueue(queues);
+ return queues;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Mon Jul 20 19:05:05 2009
@@ -23,14 +23,15 @@
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
public class TestExchange implements Exchange
{
@@ -63,6 +64,16 @@
return false;
}
+ public boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ return false;
+ }
+
+ public boolean isBound(String bindingKey)
+ {
+ return false;
+ }
+
public void initialise(VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete)
throws AMQException
{
@@ -102,8 +113,9 @@
{
}
- public void route(IncomingMessage message) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage message) throws AMQException
{
+ return new ArrayList<AMQQueue>();
}
public int getTicket()
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Jul 20 19:05:05 2009
@@ -57,6 +57,7 @@
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.ServerMessage;
public class AMQChannel
{
@@ -157,27 +158,35 @@
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
throws AMQException
{
- if (_currentMessage == null)
- {
- throw new AMQException("Received content header without previously receiving a BasicPublish frame");
- }
- else
+ StoreContext.setCurrentContext(_storeContext);
+ try
{
- if (_log.isDebugEnabled())
+ if (_currentMessage == null)
{
- _log.debug("Content header received on channel " + _channelId);
+ throw new AMQException("Received content header without previously receiving a BasicPublish frame");
}
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Content header received on channel " + _channelId);
+ }
- _currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setContentHeaderBody(contentHeaderBody);
- _currentMessage.setExpiration();
+ _currentMessage.setExpiration();
- routeCurrentMessage();
+ routeCurrentMessage();
- _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+ _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
- deliverCurrentMessageIfComplete();
+ deliverCurrentMessageIfComplete();
+ }
+ }
+ finally
+ {
+ StoreContext.clearCurrentContext();
}
}
@@ -212,6 +221,7 @@
public void publishContentBody(ContentBody contentBody) throws AMQException
{
+ StoreContext.setCurrentContext(_storeContext);
if (_currentMessage == null)
{
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
@@ -231,6 +241,7 @@
_session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
contentBody));
+
deliverCurrentMessageIfComplete();
}
catch (AMQException e)
@@ -240,6 +251,10 @@
_currentMessage = null;
throw e;
}
+ finally
+ {
+ StoreContext.clearCurrentContext();
+ }
}
protected void routeCurrentMessage() throws AMQException
@@ -425,7 +440,7 @@
{
if (entry.getQueue() == null)
{
- _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity());
+ _log.debug("Adding unacked message with a null queue:" + entry);
}
else
{
@@ -487,7 +502,7 @@
if (!unacked.isQueueDeleted())
{
// Mark message redelivered
- unacked.getMessage().setRedelivered(true);
+ unacked.setRedelivered(true);
// Ensure message is released for redelivery
unacked.release();
@@ -518,7 +533,7 @@
if (unacked != null)
{
// Mark message redelivered
- unacked.getMessage().setRedelivered(true);
+ unacked.setRedelivered(true);
// Ensure message is released for redelivery
if (!unacked.isQueueDeleted())
@@ -551,7 +566,7 @@
}
else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
unacked.discard(_storeContext);
@@ -612,7 +627,7 @@
- AMQMessage msg = message.getMessage();
+ ServerMessage msg = message.getMessage();
AMQQueue queue = message.getQueue();
// Our Java Client will always suspend the channel when resending!
@@ -631,7 +646,7 @@
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
- msg.setRedelivered(true);
+ message.setRedelivered(true);
Subscription sub = message.getDeliveredSubscription();
@@ -829,14 +844,25 @@
{
if (!_returnMessages.isEmpty())
{
+ StoreContext sc =StoreContext.setCurrentContext(_storeContext);
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
- AMQMessage message = bouncedMessage.getAMQMessage();
- _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ ServerMessage serverMessage = bouncedMessage.getAMQMessage();
+ if(serverMessage instanceof AMQMessage)
+ {
+ AMQMessage message = (AMQMessage) serverMessage;
+ _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
- message.decrementReference(_storeContext);
+ }
+ else
+ {
+ // TODO AMQP 0-10 Message
+ throw new RuntimeException("not yet implemented conversion of 0-10 messages");
+ }
+ bouncedMessage.release();
}
+ StoreContext.setCurrentContext(sc);
_returnMessages.clear();
}
@@ -884,8 +910,18 @@
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+ ServerMessage msg = entry.getMessage();
+ if(msg instanceof AMQMessage)
+ {
+ getProtocolSession().getProtocolOutputConverter().writeDeliver((AMQMessage)msg, getChannelId(),
+ deliveryTag, sub.getConsumerTag());
+ }
+ else
+ {
+ //TODO - Convert 0-10 Message into 0-8/9 message
+ }
}
+
};
public ClientDeliveryMethod getClientDeliveryMethod()
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java Mon Jul 20 19:05:05 2009
@@ -26,6 +26,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -56,9 +57,8 @@
public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
{
-
- AMQMessage msg = message.getMessage();
- msg.setRedelivered(true);
+
+ message.setRedelivered(true);
final Subscription subscription = message.getDeliveredSubscription();
if (subscription != null)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Mon Jul 20 19:05:05 2009
@@ -45,6 +45,9 @@
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.*;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -56,6 +59,9 @@
import org.apache.qpid.server.protocol.AMQPProtocolProvider;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.transport.ServerConnection;
/**
* Main entry point for AMQPD.
@@ -314,6 +320,29 @@
}
bind(port, serverConfig);
+
+
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+ final ConnectionDelegate delegate =
+ new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, "localhost");
+
+
+ ConnectionBinding cb = new ConnectionBinding()
+ {
+ public Connection connection()
+ {
+ ServerConnection conn = new ServerConnection();
+ conn.setConnectionDelegate(delegate);
+ return conn;
+ }
+ };
+
+ int port_0_10 = port + 1;
+
+ org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor
+ ("0.0.0.0", port_0_10, cb);
+ ioa.start();
}
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Mon Jul 20 19:05:05 2009
@@ -23,6 +23,9 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageReference;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
/**
* Signals that a required delivery could not be made. This could be bacuse of the immediate flag being set and the
@@ -39,9 +42,9 @@
*/
public abstract class RequiredDeliveryException extends AMQException
{
- private AMQMessage _amqMessage;
+ private MessageReference _amqMessage;
- public RequiredDeliveryException(String message, AMQMessage payload)
+ public RequiredDeliveryException(String message, ServerMessage payload)
{
super(message);
@@ -54,20 +57,20 @@
super(message);
}
- public void setMessage(final AMQMessage payload)
+ public void setMessage(final ServerMessage payload)
{
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
// handler. So increment here.
- _amqMessage = payload.takeReference();
+ _amqMessage = payload.newReference();
}
- public AMQMessage getAMQMessage()
+ public ServerMessage getAMQMessage()
{
- return _amqMessage;
+ return _amqMessage.getMessage();
}
public AMQConstant getErrorCode()
@@ -76,4 +79,9 @@
}
public abstract AMQConstant getReplyCode();
+
+ public void release()
+ {
+ _amqMessage.release();
+ }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Mon Jul 20 19:05:05 2009
@@ -130,7 +130,7 @@
//in memory (persistent changes will be rolled back by store)
for (QueueEntry msg : _unacked.values())
{
- msg.getMessage().takeReference();
+ // TODO - should requeue, whole thing is messed up
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Mon Jul 20 19:05:05 2009
@@ -39,6 +39,7 @@
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -204,4 +205,15 @@
{
return getVirtualHost().getQueueRegistry();
}
+
+ public boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ return isBound(new AMQShortString(bindingKey), queue);
+ }
+
+ public boolean isBound(String bindingKey)
+ {
+ return isBound(new AMQShortString(bindingKey));
+ }
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Mon Jul 20 19:05:05 2009
@@ -24,7 +24,6 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -57,6 +56,11 @@
new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
}
+ public Exchange getExchange(String exchangeName)
+ {
+ return getExchange(new AMQShortString(exchangeName));
+ }
+
public MessageStore getMessageStore()
{
return _host.getMessageStore();
@@ -134,6 +138,6 @@
{
throw new AMQException("Exchange '" + exchange + "' does not exist");
}
- exch.route(payload);
+ payload.enqueue(exch.route(payload));
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Mon Jul 20 19:05:05 2009
@@ -40,9 +40,9 @@
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
public class DirectExchange extends AbstractExchange
{
@@ -192,10 +192,10 @@
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
- final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey();
+ final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : new AMQShortString(payload.getRoutingKey());
final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
@@ -204,7 +204,8 @@
_logger.debug("Publishing message to queue " + queues);
}
- payload.enqueue(queues);
+ return queues;
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Mon Jul 20 19:05:05 2009
@@ -24,12 +24,11 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
public interface Exchange
{
@@ -54,7 +53,7 @@
void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
- void route(IncomingMessage message) throws AMQException;
+ ArrayList<AMQQueue> route(InboundMessage message) throws AMQException;
/**
@@ -93,6 +92,8 @@
*/
boolean hasBindings();
-
+ boolean isBound(String bindingKey, AMQQueue queue);
+
+ boolean isBound(String bindingKey);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Mon Jul 20 19:05:05 2009
@@ -48,4 +48,6 @@
Collection<AMQShortString> getExchangeNames();
void initialise() throws AMQException;
+
+ Exchange getExchange(String exchangeName);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Mon Jul 20 19:05:05 2009
@@ -28,9 +28,9 @@
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -182,7 +182,7 @@
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
@@ -191,7 +191,7 @@
_logger.debug("Publishing message to queue " + _queues);
}
- payload.enqueue(new ArrayList(_queues));
+ return new ArrayList(_queues);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Mon Jul 20 19:05:05 2009
@@ -28,6 +28,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.message.AMQMessageHeader;
/**
* Defines binding and matching based on a set of headers.
@@ -139,7 +140,7 @@
* @return true if the headers define any required keys and match any required
* values
*/
- public boolean matches(FieldTable headers)
+ public boolean matches(AMQMessageHeader headers)
{
if(headers == null)
{
@@ -151,13 +152,13 @@
}
}
- private boolean and(FieldTable headers)
+ private boolean and(AMQMessageHeader headers)
{
- if(headers.keys().containsAll(required))
+ if(headers.containsHeaders(required))
{
for(Map.Entry<String, Object> e : matches.entrySet())
{
- if(!e.getValue().equals(headers.getObject(e.getKey())))
+ if(!e.getValue().equals(headers.getHeader(e.getKey())))
{
return false;
}
@@ -171,11 +172,11 @@
}
- private boolean or(final FieldTable headers)
+ private boolean or(final AMQMessageHeader headers)
{
- if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor()))
+ if(required.isEmpty() || passesRequiredOr(headers))
{
- return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor()))
+ return ((!matches.isEmpty()) && passesMatchesOr(headers))
|| (required.isEmpty() && matches.isEmpty());
}
else
@@ -184,6 +185,32 @@
}
}
+ private boolean passesMatchesOr(AMQMessageHeader headers)
+ {
+ for(Map.Entry<String,Object> entry : matches.entrySet())
+ {
+ if(!headers.containsHeader(entry.getKey())
+ || !((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
+ || (entry.getValue().equals(headers.getHeader(entry.getKey())))))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean passesRequiredOr(AMQMessageHeader headers)
+ {
+ for(String name : required)
+ {
+ if(headers.containsHeader(name))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void processSpecial(String key, Object value)
{
if("X-match".equalsIgnoreCase(key))
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Mon Jul 20 19:05:05 2009
@@ -31,9 +31,10 @@
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
import javax.management.JMException;
import javax.management.openmbean.ArrayType;
@@ -50,7 +51,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -237,31 +237,31 @@
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
- FieldTable headers = getHeaders(payload.getContentHeaderBody());
+ AMQMessageHeader header = payload.getMessageHeader();
if (_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
+ _logger.debug("Exchange " + getName() + ": routing message with headers " + header);
}
boolean routed = false;
ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
for (Registration e : _bindings)
{
- if (e.binding.matches(headers))
+ if (e.binding.matches(header))
{
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getName() + ": delivering message with headers " +
- headers + " to " + e.queue.getName());
+ header + " to " + e.queue.getName());
}
queues.add(e.queue);
routed = true;
}
}
- payload.enqueue(queues);
+ return queues;
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Mon Jul 20 19:05:05 2009
@@ -30,13 +30,13 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortStringTokenizer;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.message.InboundMessage;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -109,7 +109,7 @@
private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
- private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
+ private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
public static class Binding
{
@@ -160,7 +160,7 @@
private final class TopicExchangeResult implements TopicMatcherResult
{
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
- private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
+ private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
public void addUnfilteredQueue(AMQQueue queue)
{
@@ -190,12 +190,12 @@
}
- public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
if(filters == null)
{
- filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
+ filters = new ConcurrentHashMap<MessageFilter,Integer>();
_filteredQueues.put(queue, filters);
}
Integer instances = filters.get(filter);
@@ -210,9 +210,9 @@
}
- public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ public void removeFilteredQueue(AMQQueue queue, MessageFilter filter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
if(filters != null)
{
Integer instances = filters.get(filter);
@@ -237,11 +237,11 @@
}
public void replaceQueueFilter(AMQQueue queue,
- MessageFilter<RuntimeException> oldFilter,
- MessageFilter<RuntimeException> newFilter)
+ MessageFilter oldFilter,
+ MessageFilter newFilter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
- Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters);
Integer oldFilterInstances = filters.get(oldFilter);
if(oldFilterInstances == 1)
{
@@ -263,7 +263,7 @@
_filteredQueues.put(queue,newFilters);
}
- public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues)
{
if(queues == null)
{
@@ -284,11 +284,11 @@
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
- for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
+ for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet())
{
if(!queues.contains(entry.getKey()))
{
- for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
+ for(MessageFilter filter : entry.getValue().keySet())
{
if(filter.matches(msg))
{
@@ -456,18 +456,18 @@
}
- private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
+ private JMSSelectorFilter createSelectorFilter(final FieldTable args)
throws AMQException
{
final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
- WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
+ WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
JMSSelectorFilter selector = null;
if(selectorRef == null || (selector = selectorRef.get())==null)
{
- selector = new JMSSelectorFilter<RuntimeException>(selectorString);
- _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
+ selector = new JMSSelectorFilter(selectorString);
+ _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
}
return selector;
}
@@ -528,10 +528,12 @@
return normalizedString;
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
- final AMQShortString routingKey = payload.getRoutingKey();
+ final AMQShortString routingKey = payload.getRoutingKey() == null
+ ? AMQShortString.EMPTY_STRING
+ : new AMQShortString(payload.getRoutingKey());
// The copy here is unfortunate, but not too bad relevant to the amount of
// things created and copied in getMatchedQueues
@@ -543,7 +545,7 @@
_logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
}
- payload.enqueue(queues);
+ return queues;
}
@@ -646,7 +648,7 @@
}
}
- private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
+ private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java Mon Jul 20 19:05:05 2009
@@ -26,7 +26,7 @@
/**
* An expression which performs an operation on two expression values
*/
-public abstract class ArithmeticExpression<E extends Exception> extends BinaryExpression<E>
+public abstract class ArithmeticExpression extends BinaryExpression
{
protected static final int INTEGER = 1;
@@ -248,7 +248,7 @@
}
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object lvalue = left.evaluate(message);
if (lvalue == null)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java Mon Jul 20 19:05:05 2009
@@ -23,23 +23,23 @@
/**
* An expression which performs an operation on two expression values.
*/
-public abstract class BinaryExpression<E extends Exception> implements Expression<E>
+public abstract class BinaryExpression implements Expression
{
- protected Expression<E> left;
- protected Expression<E> right;
+ protected Expression left;
+ protected Expression right;
- public BinaryExpression(Expression<E> left, Expression<E> right)
+ public BinaryExpression(Expression left, Expression right)
{
this.left = left;
this.right = right;
}
- public Expression<E> getLeft()
+ public Expression getLeft()
{
return left;
}
- public Expression<E> getRight()
+ public Expression getRight()
{
return right;
}
@@ -90,7 +90,7 @@
/**
* @param expression
*/
- public void setRight(Expression<E> expression)
+ public void setRight(Expression expression)
{
right = expression;
}
@@ -98,7 +98,7 @@
/**
* @param expression
*/
- public void setLeft(Expression<E> expression)
+ public void setLeft(Expression expression)
{
left = expression;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java Mon Jul 20 19:05:05 2009
@@ -28,14 +28,13 @@
* A BooleanExpression is an expression that always
* produces a Boolean result.
*/
-public interface BooleanExpression<E extends Exception> extends Expression<E>
+public interface BooleanExpression extends Expression
{
/**
* @param message
* @return true if the expression evaluates to Boolean.TRUE.
- * @throws E
*/
- public boolean matches(Filterable<E> message) throws E;
+ public boolean matches(Filterable message);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java Mon Jul 20 19:05:05 2009
@@ -34,15 +34,15 @@
/**
* A filter performing a comparison of two objects
*/
-public abstract class ComparisonExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression
{
- public static<E extends Exception> BooleanExpression<E> createBetween(Expression<E> value, Expression left, Expression<E> right)
+ public static BooleanExpression createBetween(Expression value, Expression left, Expression right)
{
return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
}
- public static<E extends Exception> BooleanExpression<E> createNotBetween(Expression<E> value, Expression<E> left, Expression<E> right)
+ public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right)
{
return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
}
@@ -73,7 +73,7 @@
REGEXP_CONTROL_CHARS.add(new Character('!'));
}
- static class LikeExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
+ static class LikeExpression extends UnaryExpression implements BooleanExpression
{
Pattern likePattern;
@@ -81,7 +81,7 @@
/**
* @param right
*/
- public LikeExpression(Expression<E> right, String like, int escape)
+ public LikeExpression(Expression right, String like, int escape)
{
super(right);
@@ -138,7 +138,7 @@
/**
* org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
*/
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object rv = this.getRight().evaluate(message);
@@ -158,7 +158,7 @@
return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
}
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
@@ -236,7 +236,7 @@
return doCreateEqual(left, right);
}
- private static<E extends Exception> BooleanExpression<E> doCreateEqual(Expression<E> left, Expression<E> right)
+ private static BooleanExpression doCreateEqual(Expression left, Expression right)
{
return new EqualExpression(left, right);
}
@@ -388,7 +388,7 @@
super(left, right);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Comparable lv = (Comparable) left.evaluate(message);
if (lv == null)
@@ -550,21 +550,21 @@
protected abstract boolean asBoolean(int answer);
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
return (object != null) && (object == Boolean.TRUE);
}
- private static class EqualExpression<E extends Exception> extends ComparisonExpression<E>
+ private static class EqualExpression extends ComparisonExpression
{
- public EqualExpression(final Expression<E> left, final Expression<E> right)
+ public EqualExpression(final Expression left, final Expression right)
{
super(left, right);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object lv = left.evaluate(message);
Object rv = right.evaluate(message);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java Mon Jul 20 19:05:05 2009
@@ -32,17 +32,17 @@
/**
* Represents a constant expression
*/
-public class ConstantExpression<E extends Exception> implements Expression<E>
+public class ConstantExpression implements Expression
{
- static class BooleanConstantExpression<E extends Exception> extends ConstantExpression<E> implements BooleanExpression<E>
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
{
public BooleanConstantExpression(Object value)
{
super(value);
}
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
@@ -121,7 +121,7 @@
this.value = value;
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
return value;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java Mon Jul 20 19:05:05 2009
@@ -27,12 +27,12 @@
/**
* Represents an expression
*/
-public interface Expression<E extends Exception>
+public interface Expression
{
/**
* @return the value of this expression
*/
- public Object evaluate(Filterable<E> message) throws E;
+ public Object evaluate(Filterable message);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java Mon Jul 20 19:05:05 2009
@@ -27,13 +27,13 @@
import org.apache.qpid.server.queue.Filterable;
import org.apache.qpid.AMQException;
-public interface FilterManager<E extends Exception>
+public interface FilterManager
{
- void add(MessageFilter<E> filter);
+ void add(MessageFilter filter);
- void remove(MessageFilter<E> filter);
+ void remove(MessageFilter filter);
- boolean allAllow(Filterable<E> msg);
+ boolean allAllow(Filterable msg);
boolean hasFilters();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Mon Jul 20 19:05:05 2009
@@ -26,12 +26,12 @@
import org.apache.qpid.server.queue.Filterable;
-public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E>
+public class JMSSelectorFilter implements MessageFilter
{
private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
private String _selector;
- private BooleanExpression<E> _matcher;
+ private BooleanExpression _matcher;
public JMSSelectorFilter(String selector) throws AMQException
{
@@ -39,7 +39,7 @@
_matcher = new SelectorParser().parse(selector);
}
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
boolean match = _matcher.matches(message);
if(_logger.isDebugEnabled())
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Mon Jul 20 19:05:05 2009
@@ -27,15 +27,15 @@
/**
* A filter performing a comparison of two objects
*/
-public abstract class LogicExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
+public abstract class LogicExpression extends BinaryExpression implements BooleanExpression
{
- public static<E extends Exception> BooleanExpression createOR(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
+ public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue)
{
return new OrExpression(lvalue, rvalue);
}
- public static<E extends Exception> BooleanExpression createAND(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
+ public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue)
{
return new AndExpression(lvalue, rvalue);
}
@@ -49,23 +49,23 @@
super(left, right);
}
- public abstract Object evaluate(Filterable<E> message) throws E;
+ public abstract Object evaluate(Filterable message);
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
return (object != null) && (object == Boolean.TRUE);
}
- private static class OrExpression<E extends Exception> extends LogicExpression<E>
+ private static class OrExpression extends LogicExpression
{
- public OrExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+ public OrExpression(final BooleanExpression lvalue, final BooleanExpression rvalue)
{
super(lvalue, rvalue);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Boolean lv = (Boolean) left.evaluate(message);
@@ -86,14 +86,14 @@
}
}
- private static class AndExpression<E extends Exception> extends LogicExpression<E>
+ private static class AndExpression extends LogicExpression
{
- public AndExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+ public AndExpression(final BooleanExpression lvalue, final BooleanExpression rvalue)
{
super(lvalue, rvalue);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Boolean lv = (Boolean) left.evaluate(message);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Mon Jul 20 19:05:05 2009
@@ -24,7 +24,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
-public interface MessageFilter<E extends Exception>
+public interface MessageFilter
{
- boolean matches(Filterable<E> message) throws E;
+ boolean matches(Filterable message);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Mon Jul 20 19:05:05 2009
@@ -27,7 +27,6 @@
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.server.queue.Filterable;
@@ -35,7 +34,7 @@
/**
* Represents a property expression
*/
-public class PropertyExpression<E extends Exception> implements Expression<E>
+public class PropertyExpression implements Expression
{
// Constants - defined the same as JMS
private static final int NON_PERSISTENT = 1;
@@ -44,12 +43,12 @@
private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
- private static final HashMap<String, Expression<? extends Exception>> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression<? extends Exception>>();
+ private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
{
- JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression<E>()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression()
{
- public Object evaluate(Filterable<E> message)
+ public Object evaluate(Filterable message)
{
//TODO
return null;
@@ -73,9 +72,9 @@
JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression());
- JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression()
{
- public Object evaluate(Filterable message) throws E
+ public Object evaluate(Filterable message)
{
return message.isRedelivered();
}
@@ -83,7 +82,7 @@
}
private final String name;
- private final Expression<E> jmsPropertyExpression;
+ private final Expression jmsPropertyExpression;
public boolean outerTest()
{
@@ -96,10 +95,10 @@
- jmsPropertyExpression = (Expression<E>) JMS_PROPERTY_EXPRESSIONS.get(name);
+ jmsPropertyExpression = (Expression) JMS_PROPERTY_EXPRESSIONS.get(name);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
if (jmsPropertyExpression != null)
@@ -108,17 +107,7 @@
}
else
{
-
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Looking up property:" + name);
- _logger.debug("Properties are:" + _properties.getHeaders().keySet());
- }
-
- return _properties.getHeaders().getObject(name);
+ return message.getMessageHeader().getHeader(name);
}
}
@@ -158,39 +147,30 @@
}
- private static class ReplyToExpression<E extends Exception> implements Expression<E>
+ private static class ReplyToExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
-
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString replyTo = _properties.getReplyTo();
-
- return (replyTo == null) ? null : replyTo.toString();
-
+ String replyTo = message.getMessageHeader().getReplyTo();
+ return replyTo;
}
}
- private static class TypeExpression<E extends Exception> implements Expression<E>
+ private static class TypeExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString type = _properties.getType();
- return (type == null) ? null : type.toString();
+ String type = message.getMessageHeader().getType();
+ return type;
}
}
- private static class DeliveryModeExpression<E extends Exception> implements Expression<E>
+ private static class DeliveryModeExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
if (_logger.isDebugEnabled())
@@ -202,68 +182,53 @@
}
}
- private static class PriorityExpression<E extends Exception> implements Expression<E>
+ private static class PriorityExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return (int) _properties.getPriority();
+ byte priority = message.getMessageHeader().getPriority();
+ return (int) priority;
}
}
- private static class MessageIDExpression<E extends Exception> implements Expression<E>
+ private static class MessageIDExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString messageId = _properties.getMessageId();
+ String messageId = message.getMessageHeader().getMessageId();
- return (messageId == null) ? null : messageId;
+ return messageId;
}
}
- private static class TimestampExpression<E extends Exception> implements Expression<E>
+ private static class TimestampExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return _properties.getTimestamp();
+ long timestamp = message.getMessageHeader().getTimestamp();
+ return timestamp;
}
}
- private static class CorrelationIdExpression<E extends Exception> implements Expression<E>
+ private static class CorrelationIdExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString correlationId = _properties.getCorrelationId();
- return (correlationId == null) ? null : correlationId.toString();
+ String correlationId = message.getMessageHeader().getCorrelationId();
+
+ return correlationId;
}
}
- private static class ExpirationExpression<E extends Exception> implements Expression<E>
+ private static class ExpirationExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
-
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return _properties.getExpiration();
+ long expiration = message.getMessageHeader().getExpiration();
+ return expiration;
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Mon Jul 20 19:05:05 2009
@@ -27,43 +27,34 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
-public class SimpleFilterManager implements FilterManager<AMQException>
+public class SimpleFilterManager implements FilterManager
{
private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
- private final ConcurrentLinkedQueue<MessageFilter<AMQException>> _filters;
+ private final ConcurrentLinkedQueue<MessageFilter> _filters;
public SimpleFilterManager()
{
_logger.debug("Creating SimpleFilterManager");
- _filters = new ConcurrentLinkedQueue<MessageFilter<AMQException>>();
+ _filters = new ConcurrentLinkedQueue<MessageFilter>();
}
- public void add(MessageFilter<AMQException> filter)
+ public void add(MessageFilter filter)
{
_filters.add(filter);
}
- public void remove(MessageFilter<AMQException> filter)
+ public void remove(MessageFilter filter)
{
_filters.remove(filter);
}
- public boolean allAllow(Filterable<AMQException> msg)
+ public boolean allAllow(Filterable msg)
{
- for (MessageFilter<AMQException> filter : _filters)
+ for (MessageFilter filter : _filters)
{
- try
+ if (!filter.matches(msg))
{
- if (!filter.matches(msg))
- {
- return false;
- }
- }
- catch (AMQException e)
- {
- //fixme
- e.printStackTrace();
return false;
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Mon Jul 20 19:05:05 2009
@@ -35,18 +35,18 @@
/**
* An expression which performs an operation on two expression values
*/
-public abstract class UnaryExpression<E extends Exception> implements Expression<E>
+public abstract class UnaryExpression implements Expression
{
private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
- protected Expression<E> right;
+ protected Expression right;
- public static<E extends Exception> Expression<E> createNegate(Expression<E> left)
+ public static Expression createNegate(Expression left)
{
return new NegativeExpression(left);
}
- public static<E extends Exception> BooleanExpression createInExpression(PropertyExpression<E> right, List elements, final boolean not)
+ public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not)
{
// Use a HashSet if there are many elements.
@@ -69,14 +69,14 @@
return new InExpression(right, inList, not);
}
- abstract static class BooleanUnaryExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
+ abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression
{
- public BooleanUnaryExpression(Expression<E> left)
+ public BooleanUnaryExpression(Expression left)
{
super(left);
}
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
@@ -85,7 +85,7 @@
}
;
- public static<E extends Exception> BooleanExpression<E> createNOT(BooleanExpression<E> left)
+ public static<E extends Exception> BooleanExpression createNOT(BooleanExpression left)
{
return new NotExpression(left);
}
@@ -100,7 +100,7 @@
return new XQueryExpression(xpath);
}
- public static<E extends Exception> BooleanExpression createBooleanCast(Expression<E> left)
+ public static<E extends Exception> BooleanExpression createBooleanCast(Expression left)
{
return new BooleanCastExpression(left);
}
@@ -151,7 +151,7 @@
this.right = left;
}
- public Expression<E> getRight()
+ public Expression getRight()
{
return right;
}
@@ -204,14 +204,14 @@
*/
public abstract String getExpressionSymbol();
- private static class NegativeExpression<E extends Exception> extends UnaryExpression<E>
+ private static class NegativeExpression extends UnaryExpression
{
- public NegativeExpression(final Expression<E> left)
+ public NegativeExpression(final Expression left)
{
super(left);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object rvalue = right.evaluate(message);
if (rvalue == null)
@@ -233,19 +233,19 @@
}
}
- private static class InExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ private static class InExpression extends BooleanUnaryExpression
{
private final Collection _inList;
private final boolean _not;
- public InExpression(final PropertyExpression<E> right, final Collection inList, final boolean not)
+ public InExpression(final PropertyExpression right, final Collection inList, final boolean not)
{
super(right);
_inList = inList;
_not = not;
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object rvalue = right.evaluate(message);
@@ -309,14 +309,14 @@
}
}
- private static class NotExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ private static class NotExpression extends BooleanUnaryExpression
{
- public NotExpression(final BooleanExpression<E> left)
+ public NotExpression(final BooleanExpression left)
{
super(left);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Boolean lvalue = (Boolean) right.evaluate(message);
if (lvalue == null)
@@ -333,14 +333,14 @@
}
}
- private static class BooleanCastExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ private static class BooleanCastExpression extends BooleanUnaryExpression
{
- public BooleanCastExpression(final Expression<E> left)
+ public BooleanCastExpression(final Expression left)
{
super(left);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object rvalue = right.evaluate(message);
if (rvalue == null)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Mon Jul 20 19:05:05 2009
@@ -71,7 +71,7 @@
private final XPathEvaluator evaluator;
static public interface XPathEvaluator {
- public boolean evaluate(Filterable message) throws AMQException;
+ public boolean evaluate(Filterable message);
}
XPathExpression(String xpath) {
@@ -93,7 +93,7 @@
}
}
- public Object evaluate(Filterable message) throws AMQException {
+ public Object evaluate(Filterable message) {
// try {
//FIXME this is flow to disk work
// if( message.isDropped() )
@@ -118,7 +118,7 @@
* @return true if the expression evaluates to Boolean.TRUE.
* @throws AMQException
*/
- public boolean matches(Filterable message) throws AMQException
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org