You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2006/12/19 11:51:42 UTC
svn commit: r488624 [1/2] - in /incubator/qpid/trunk/qpid: java/broker/
java/broker/src/main/grammar/
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/filter/
java/broker/src/main/java/org/apache/qpid/s...
Author: ritchiem
Date: Tue Dec 19 02:51:39 2006
New Revision: 488624
URL: http://svn.apache.org/viewvc?view=rev&rev=488624
Log:
QPID-21
Added:
SelectorParser.jj - ActiveMQ selector javacc grammar used to generate SelectorParser.java
server/filter - Selector Filtering code from ActiveMQ project adjusted to suite our class and package structure.
server/message - Decorator classes to allow access to the JMSMessage inside the AMQMessage
ConcurrentSelectorDeliveryManager.java - A new DeliveryManager that utilises PreDeliveryQueues to implement selectors
AMQInvalidSelectorException.java - thrown on client and broker when the Selector text is invalid.
Common: log4j.properties to remove error log4j warnings on Common tests.
Modified:
broker/pom.xml - to generate SelectorParser.java
AMQChannel.java - Addition of argument fieldtable for filter setup.
BasicConsumeMethodHandler.java - writing of InvalidSelector channel close exception.
AMQMessage.java - Added decorator to get access to the enclosed JMSMessage
AMQQueue.java - Enhanced 'deliverymanager' property to allow the selection of the ConcurrentSelectorDeliveryManager.
Subscription.java - Enhanced interface to allow a subscription to state an 'interest' in a given message.
SubscriptionFactory.java - Added method to allow passing of filter arguments.
SubscriptionImpl.java - Implemented new Subscription.java methods.
SubscriptionManager.java - Added ability to get a list of current subscribers.
SubscriptionSet.java - augmented nextSubscriber to allow the subscriber to exert the new hasInterest feature.
SynchronizedDeliveryManager.java - fixed Logging class
AMQSession - Added filter extraction from consume call and pass it on to the registration.
ChannelCloseMethodHandler.java - Handle the reception and correct raising of the InvalidSelector Exception
AbstractJMSMessage.java - Expanded imports
BlockingMethodFrameListener.java - added extra info to a debug output line.
SocketTransportConnection.java - made output an info not a warn.
PropertiesFileInitialContextFactory.java - updated to allow the PROVIDER_URL to specify a property file to read in for the initial values.
ClusteredSubscriptionManager.java - Implementation of SubscriptionSet.java
NestedSubscriptionManager.java - Implementation of SubscriptionManager.java
RemoteSubscriptionImpl.java - Implementation Subscription.java
AMQConstant.java - Added '322' "Invalid Selector"
SubscriptionTestHelper.java - Implementation of Subscription.java
Edited specs/amqp-8.0.xml to add field table to consume method.
Thanks to the ActiveMQ project for writing the initial SelectorParser.jj and associated filter Expressions.
Added:
incubator/qpid/trunk/qpid/java/broker/src/main/grammar/
- copied from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/grammar/
incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/grammar/SelectorParser.jj
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/
- copied from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
- copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
- copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
- copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
- copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/
- copied from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/
- copied from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
- copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties (with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
- copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
Modified:
incubator/qpid/trunk/qpid/java/broker/pom.xml
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
incubator/qpid/trunk/qpid/specs/amqp-8.0.xml
Modified: incubator/qpid/trunk/qpid/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/pom.xml?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/pom.xml Tue Dec 19 02:51:39 2006
@@ -55,6 +55,10 @@
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-filter-ssl</artifactId>
</dependency>
@@ -82,6 +86,25 @@
<build>
<plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.0</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+ <outputDirectory>${basedir}/target/generated</outputDirectory>
+ <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+ </configuration>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Dec 19 02:51:39 2006
@@ -26,6 +26,7 @@
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -290,7 +291,7 @@
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -301,7 +302,7 @@
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java Tue Dec 19 02:51:39 2006
@@ -32,4 +32,6 @@
void remove(MessageFilter filter);
boolean allAllow(AMQMessage msg);
+
+ boolean hasFilters();
}
Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Tue Dec 19 02:51:39 2006
@@ -53,16 +53,29 @@
_logger.info("filter:" + key);
if (key.equals(JMS_SELECTOR_FILTER))
{
- manager.add(new JMSSelectorFilter((String) filters.get(key)));
+ String selector = (String) filters.get(key);
+
+ if (selector != null && !selector.equals(""))
+ {
+ manager.add(new JMSSelectorFilter(selector));
+ }
}
}
+
+ //If we added no filters don't bear the overhead of having an filter manager
+ if (!manager.hasFilters())
+ {
+ manager = null;
+ }
}
else
{
_logger.info("No Filters found.");
}
+
return manager;
+
}
}
Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Tue Dec 19 02:51:39 2006
@@ -22,22 +22,17 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.filter.jms.selector.SelectorParser;
-import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.log4j.Logger;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
public class JMSSelectorFilter implements MessageFilter
{
-
private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
-// LoggerFactory.getLogger(JMSSelectorFilter.class);
private String _selector;
private BooleanExpression _matcher;
@@ -47,7 +42,6 @@
_selector = selector;
_logger.info("Created JMSSelectorFilter with selector:" + _selector);
- // BooleanExpression activemqSelctor = new ActiveMQSelectorParser(selector);
try
{
Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Tue Dec 19 02:51:39 2006
@@ -69,4 +69,9 @@
}
return true;
}
+
+ public boolean hasFilters()
+ {
+ return !_filters.isEmpty();
+ }
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Dec 19 02:51:39 2006
@@ -21,10 +21,12 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.log4j.Logger;
@@ -68,14 +71,14 @@
{
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
- if(queue == null)
+ if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck);
- if(!body.nowait)
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments);
+ if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
}
@@ -83,10 +86,19 @@
//now allow queue to start async processing of any backlog of messages
queue.deliverAsync();
}
- catch(ConsumerTagNotUniqueException e)
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+ ise.getMessage(), BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
+ }
+ catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+ BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
}
}
}
Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java Tue Dec 19 02:51:39 2006
@@ -31,12 +31,11 @@
import javax.jms.MessageNotWriteableException;
import java.util.Enumeration;
-public class JMSMessage implements MessageDecorator
+public class JMSMessage implements MessageDecorator
{
private AMQMessage _message;
private BasicContentHeaderProperties _properties;
- private Destination _replyTo;
public JMSMessage(AMQMessage message)
{
@@ -45,13 +44,13 @@
_properties = (BasicContentHeaderProperties) contentHeader.properties;
}
- protected void checkWriteable()
+ protected void checkWriteable() throws MessageNotWriteableException
{
-// The broker should not modify a message.
+ //The broker should not modify a message.
// if (_readableMessage)
-// {
-// throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
-// }
+ {
+ throw new MessageNotWriteableException("The broker should not modify a message.");
+ }
}
@@ -60,9 +59,10 @@
return _properties.getMessageId();
}
- public void setJMSMessageID(String string)
+ public void setJMSMessageID(String string) throws MessageNotWriteableException
{
checkWriteable();
+ _properties.setMessageId(string);
}
public long getJMSTimestamp()
@@ -70,9 +70,10 @@
return _properties.getTimestamp();
}
- public void setJMSTimestamp(long l)
+ public void setJMSTimestamp(long l) throws MessageNotWriteableException
{
checkWriteable();
+ _properties.setTimestamp(l);
}
public byte[] getJMSCorrelationIDAsBytes()
@@ -80,14 +81,14 @@
return _properties.getCorrelationId().getBytes();
}
- public void setJMSCorrelationIDAsBytes(byte[] bytes)
- {
- checkWriteable();
- }
+// public void setJMSCorrelationIDAsBytes(byte[] bytes)
+// {
+// }
- public void setJMSCorrelationID(String string)
+ public void setJMSCorrelationID(String string) throws MessageNotWriteableException
{
checkWriteable();
+ _properties.setCorrelationId(string);
}
public String getJMSCorrelationID()
@@ -100,20 +101,22 @@
return _properties.getReplyTo();
}
- public void setJMSReplyTo(Destination destination)
+ public void setJMSReplyTo(Destination destination) throws MessageNotWriteableException
{
checkWriteable();
+ _properties.setReplyTo(destination.toString());
}
public String getJMSDestination()
{
- //FIXME Currently the Destination has not been defined.
+ //fixme should be a deestination
return "";
}
- public void setJMSDestination(Destination destination)
+ public void setJMSDestination(Destination destination) throws MessageNotWriteableException
{
checkWriteable();
+ //_properties.setDestination(destination.toString());
}
public int getJMSDeliveryMode()
@@ -121,9 +124,10 @@
return _properties.getDeliveryMode();
}
- public void setJMSDeliveryMode(int i)
+ public void setJMSDeliveryMode(byte i) throws MessageNotWriteableException
{
checkWriteable();
+ _properties.setDeliveryMode(i);
}
public boolean getJMSRedelivered()
@@ -131,9 +135,10 @@
return _message.isRedelivered();
}
- public void setJMSRedelivered(boolean b)
+ public void setJMSRedelivered(boolean b) throws MessageNotWriteableException
{
checkWriteable();
+ _message.setRedelivered(b);
}
public String getJMSType()
@@ -141,9 +146,10 @@
return _properties.getType();
}
- public void setJMSType(String string)
+ public void setJMSType(String string) throws MessageNotWriteableException
{
checkWriteable();
+ _properties.setType(string);
}
public long getJMSExpiration()
@@ -151,9 +157,10 @@
return _properties.getExpiration();
}
- public void setJMSExpiration(long l)
+ public void setJMSExpiration(long l) throws MessageNotWriteableException
{
checkWriteable();
+ _properties.setExpiration(l);
}
public int getJMSPriority()
@@ -161,122 +168,133 @@
return _properties.getPriority();
}
- public void setJMSPriority(int i)
+ public void setJMSPriority(byte i) throws MessageNotWriteableException
{
checkWriteable();
+ _properties.setPriority(i);
}
- public void clearProperties()
+ public void clearProperties() throws MessageNotWriteableException
{
checkWriteable();
+ _properties.getJMSHeaders().clear();
}
public boolean propertyExists(String string)
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().propertyExists(string);
}
- public boolean getBooleanProperty(String string)
+ public boolean getBooleanProperty(String string) throws JMSException
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getBoolean(string);
}
- public byte getByteProperty(String string)
+ public byte getByteProperty(String string) throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getByte(string);
}
- public short getShortProperty(String string)
+ public short getShortProperty(String string) throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getShort(string);
}
- public int getIntProperty(String string)
+ public int getIntProperty(String string) throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getInteger(string);
}
- public long getLongProperty(String string)
+ public long getLongProperty(String string) throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getLong(string);
}
- public float getFloatProperty(String string)
+ public float getFloatProperty(String string) throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getFloat(string);
}
- public double getDoubleProperty(String string)
+ public double getDoubleProperty(String string) throws JMSException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getDouble(string);
}
- public String getStringProperty(String string)
+ public String getStringProperty(String string) throws JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getString(string);
}
- public Object getObjectProperty(String string)
+ public Object getObjectProperty(String string) throws JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getObject(string);
}
public Enumeration getPropertyNames()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _properties.getJMSHeaders().getPropertyNames();
}
- public void setBooleanProperty(String string, boolean b)
+ public void setBooleanProperty(String string, boolean b) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setBoolean(string, b);
}
- public void setByteProperty(String string, byte b)
+ public void setByteProperty(String string, byte b) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setByte(string, b);
}
- public void setShortProperty(String string, short i)
+ public void setShortProperty(String string, short i) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setShort(string, i);
}
- public void setIntProperty(String string, int i)
+ public void setIntProperty(String string, int i) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setInteger(string, i);
}
- public void setLongProperty(String string, long l)
+ public void setLongProperty(String string, long l) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setLong(string, l);
}
- public void setFloatProperty(String string, float v)
+ public void setFloatProperty(String string, float v) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setFloat(string, v);
}
- public void setDoubleProperty(String string, double v)
+ public void setDoubleProperty(String string, double v) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setDouble(string, v);
}
- public void setStringProperty(String string, String string1)
+ public void setStringProperty(String string, String string1) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setString(string, string1);
}
- public void setObjectProperty(String string, Object object)
+ public void setObjectProperty(String string, Object object) throws JMSException
{
checkWriteable();
+ _properties.getJMSHeaders().setObject(string, object);
}
- public void acknowledge()
+ public void acknowledge() throws MessageNotWriteableException
{
checkWriteable();
}
- public void clearBody()
+ public void clearBody() throws MessageNotWriteableException
{
checkWriteable();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Dec 19 02:51:39 2006
@@ -25,6 +25,8 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
import java.util.ArrayList;
@@ -33,17 +35,21 @@
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Combines the information that make up a deliverable message into a more manageable form.
*/
public class AMQMessage
{
+ public static final String JMS_MESSAGE = "jms.message";
+
private final Set<Object> _tokens = new HashSet<Object>();
private AMQProtocolSession _publisher;
- private final BasicPublishBody _publishBody;
+ private final BasicPublishBody _publishBody;
private ContentHeaderBody _contentHeaderBody;
@@ -83,6 +89,8 @@
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
+ private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+ private AtomicBoolean _taken;
public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -96,7 +104,9 @@
_publishBody = publishBody;
_store = messageStore;
_contentBodies = new LinkedList<ContentBody>();
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_storeWhenComplete = storeWhenComplete;
+ _taken = new AtomicBoolean(false);
}
public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
@@ -107,6 +117,7 @@
_publishBody = publishBody;
_contentHeaderBody = contentHeaderBody;
_contentBodies = contentBodies;
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_messageId = messageId;
_store = store;
storeMessage();
@@ -271,7 +282,7 @@
{
_store.removeMessage(_messageId);
}
- catch(AMQException e)
+ catch (AMQException e)
{
//to maintain consistency, we revert the count
incrementReference();
@@ -292,7 +303,7 @@
public boolean checkToken(Object token)
{
- if(_tokens.contains(token))
+ if (_tokens.contains(token))
{
return true;
}
@@ -308,7 +319,7 @@
//if the message is not persistent or the queue is not durable
//we will not need to recover the association and so do not
//need to record it
- if(isPersistent() && queue.isDurable())
+ if (isPersistent() && queue.isDurable())
{
_store.enqueueMessage(queue.getName(), _messageId);
}
@@ -318,7 +329,7 @@
{
//only record associations where both queue and message will survive
//a restart, so only need to remove association if this is the case
- if(isPersistent() && queue.isDurable())
+ if (isPersistent() && queue.isDurable())
{
_store.dequeueMessage(queue.getName(), _messageId);
}
@@ -326,14 +337,14 @@
public boolean isPersistent() throws AMQException
{
- if(_contentHeaderBody == null)
+ if (_contentHeaderBody == null)
{
throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
}
//todo remove literal values to a constant file such as AMQConstants in common
return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
- &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
}
public void setTxnBuffer(TxnBuffer buffer)
@@ -352,8 +363,9 @@
* immediate delivery but has not been marked as delivered to a
* consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException{
- if(isImmediate() && !_deliveredToConsumer)
+ public void checkDeliveredToConsumer() throws NoConsumersException
+ {
+ if (isImmediate() && !_deliveredToConsumer)
{
throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
}
@@ -362,8 +374,64 @@
/**
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
+ * And by selectors to determin if the message has already been sent
*/
- public void setDeliveredToConsumer(){
+ public void setDeliveredToConsumer()
+ {
_deliveredToConsumer = true;
+ }
+
+ /**
+ * Called selectors to determin if the message has already been sent
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return _deliveredToConsumer;
+ }
+
+
+ public MessageDecorator getDecodedMessage(String type)
+ {
+ MessageDecorator msgtype = null;
+
+ if (_decodedMessages != null)
+ {
+ msgtype = _decodedMessages.get(type);
+
+ if (msgtype == null)
+ {
+ msgtype = decorateMessage(type);
+ }
+ }
+
+ return msgtype;
+ }
+
+ private MessageDecorator decorateMessage(String type)
+ {
+ MessageDecorator msgdec = null;
+
+ if (type.equals(JMS_MESSAGE))
+ {
+ msgdec = new JMSMessage(this);
+ }
+
+ if (msgdec != null)
+ {
+ _decodedMessages.put(type, msgdec);
+ }
+
+ return msgdec;
+ }
+
+ public boolean taken()
+ {
+ return _taken.getAndSet(true);
+ }
+
+ public void release()
+ {
+ _taken.set(false);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Dec 19 02:51:39 2006
@@ -22,6 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -187,16 +188,29 @@
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
- //fixme - Pick one.
- if (Boolean.getBoolean("concurrentdeliverymanager"))
+ //fixme - Make this configurable via the broker config.xml
+ if (System.getProperties().getProperty("deliverymanager") != null)
{
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+ {
+ _logger.info("Using ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ }
+ else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+ {
+ _logger.info("Using ConcurrentDeliveryManager");
+ _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ }
+ else
+ {
+ _logger.info("Using SynchronizedDeliveryManager");
+ _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ }
}
else
{
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
}
@@ -348,12 +362,12 @@
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters);
_subscribers.addSubscriber(subscription);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Tue Dec 19 02:51:39 2006
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.AMQException;
+
+import java.util.Queue;
+
public interface Subscription
{
void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
@@ -27,4 +31,13 @@
boolean isSuspended();
void queueDeleted(AMQQueue queue);
+
+ boolean hasFilters();
+
+ boolean hasInterest(AMQMessage msg);
+
+ Queue<AMQMessage> getPreDeliveryQueue();
+
+ void enqueueForPreDelivery(AMQMessage msg);
+
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Tue Dec 19 02:51:39 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,6 +33,9 @@
*/
public interface SubscriptionFactory
{
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters)
+ throws AMQException;
+
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Dec 19 02:51:39 2006
@@ -23,12 +23,18 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.util.Queue;
+
/**
* Encapsulation of a supscription to a queue.
* <p/>
@@ -48,23 +54,32 @@
private final Object sessionKey;
+ private Queue<AMQMessage> _messages;
+
+
/**
* True if messages need to be acknowledged
*/
private final boolean _acks;
+ private FilterManager _filters;
public static class Factory implements SubscriptionFactory
{
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
+ }
+
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
}
}
@@ -72,6 +87,13 @@
String consumerTag, boolean acks)
throws AMQException
{
+ this(channelId, protocolSession, consumerTag, acks, null);
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks, FieldTable filters)
+ throws AMQException
+ {
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
@@ -83,6 +105,17 @@
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _filters = FilterManagerFactory.createManager(filters);
+
+ if (_filters != null)
+ {
+ _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ else
+ {
+ // Reference the DeliveryManager
+ _messages = null;
+ }
}
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
@@ -131,7 +164,7 @@
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
-
+
// By doing this _before_ the send we ensure that it
// doesn't get sent if it can't be dequeued, preventing
// duplicate delivery on recovery.
@@ -177,6 +210,32 @@
{
channel.queueDeleted(queue);
}
+
+ public boolean hasFilters()
+ {
+ return _filters != null;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return _filters.allAllow(msg);
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return _messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ if (_messages != null)
+ {
+ _messages.offer(msg);
+ }
+ }
+
+
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Dec 19 02:51:39 2006
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.List;
+
/**
* Abstraction of actor that will determine the subscriber to whom
* a message will be sent.
*/
public interface SubscriptionManager
{
+ public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
public Subscription nextSubscriber(AMQMessage msg);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Dec 19 02:51:39 2006
@@ -21,6 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@
/**
* Remove the subscription, returning it if it was found
+ *
* @param subscription
* @return null if no match was found
*/
@@ -90,7 +93,7 @@
/**
* Return the next unsuspended subscription or null if not found.
- *
+ * <p/>
* Performance note:
* This method can scan all items twice when looking for a subscription that is not
* suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,31 +108,58 @@
return null;
}
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
+ try
+ {
+ final Subscription result = nextSubscriberImpl(msg);
+ if (result == null)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
- } else {
+ return nextSubscriberImpl(msg);
+ }
+ else
+ {
return result;
}
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
+ return nextSubscriber(msg);
}
}
- private Subscription nextSubscriber()
+ private Subscription nextSubscriberImpl(AMQMessage msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
+ while (iterator.hasNext())
+ {
Subscription subscription = iterator.next();
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
+
+ if (!subscription.isSuspended())
+ {
+ if (!subscription.hasFilters())
+ {
+ return subscription;
+ }
+ else
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
}
}
+
return null;
}
@@ -145,11 +175,19 @@
return _subscriptions.isEmpty();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return _subscriptions;
+ }
+
public boolean hasActiveSubscribers()
{
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) return true;
+ if (!s.isSuspended())
+ {
+ return true;
+ }
}
return false;
}
@@ -159,7 +197,10 @@
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) count++;
+ if (!s.isSuspended())
+ {
+ count++;
+ }
}
return count;
}
@@ -177,7 +218,8 @@
}
}
- int size() {
+ int size()
+ {
return _subscriptions.size();
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Tue Dec 19 02:51:39 2006
@@ -35,7 +35,7 @@
*/
class SynchronizedDeliveryManager implements DeliveryManager
{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+ private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
/**
* Holds any queued messages
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Dec 19 02:51:39 2006
@@ -23,11 +23,13 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
@@ -49,6 +51,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -176,7 +179,7 @@
{
if (message.deliverBody != null)
{
- final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
if (consumer == null)
{
@@ -210,17 +213,15 @@
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
}
+
}
catch (Exception e)
{
@@ -734,7 +735,6 @@
}
-
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
@@ -954,6 +954,12 @@
{
registerConsumer(consumer, false);
}
+ catch (AMQInvalidSelectorException ise)
+ {
+ JMSException ex = new InvalidSelectorException(ise.getMessage());
+ ex.setLinkedException(ise);
+ throw ex;
+ }
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -963,7 +969,7 @@
synchronized(destination)
{
- _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+ _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
_destinationConsumerCount.get(destination).incrementAndGet();
}
@@ -975,16 +981,16 @@
private void checkTemporaryDestination(Destination destination)
throws JMSException
{
- if((destination instanceof TemporaryDestination))
+ if ((destination instanceof TemporaryDestination))
{
_logger.debug("destination is temporary");
final TemporaryDestination tempDest = (TemporaryDestination) destination;
- if(tempDest.getSession() != this)
+ if (tempDest.getSession() != this)
{
_logger.debug("destination is on different session");
throw new JMSException("Cannot consume from a temporary destination created onanother session");
}
- if(tempDest.isDeleted())
+ if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot consume from a deleted destination");
@@ -1065,12 +1071,19 @@
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
- boolean nowait) throws AMQException
+ boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
+ FieldTable arguments = FieldTableFactory.newFieldTable();
+ if (messageSelector != null)
+ {
+ //fixme move literal value to a common class.
+ arguments.put("x-filter-jms-selector", messageSelector);
+ }
+
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
@@ -1080,7 +1093,7 @@
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
queueName, tag, consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait);
+ consumer.isExclusive(), nowait, arguments);
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1220,7 +1233,7 @@
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
@@ -1247,8 +1260,8 @@
subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1278,8 +1291,8 @@
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1476,7 +1489,14 @@
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+ try
+ {
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+ }
+ catch (JMSException e) //thrown by getMessageSelector
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
}
/**
@@ -1489,7 +1509,7 @@
{
_consumers.remove(consumer.getConsumerTag());
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if(subscriptionName != null)
+ if (subscriptionName != null)
{
_subscriptions.remove(subscriptionName);
}
@@ -1497,7 +1517,7 @@
Destination dest = consumer.getDestination();
synchronized(dest)
{
- if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
{
_destinationConsumerCount.remove(dest);
}
@@ -1576,7 +1596,7 @@
{
throw new javax.jms.InvalidDestinationException("Invalid Topic");
}
- if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+ if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
{
throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Dec 19 02:51:39 2006
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.protocol.AMQConstant;
@@ -46,7 +47,7 @@
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
- _logger.debug("ChannelClose method received");
+ _logger.debug("ChannelClose method received");
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
int errorCode = method.replyCode;
@@ -65,17 +66,21 @@
{
throw new AMQNoConsumersException("Error: " + reason, null);
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+ {
+ _logger.info("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(reason);
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- throw new AMQNoRouteException("Error: " + reason, null);
- }
- else
- {
- throw new AMQChannelClosedException(errorCode, "Error: " + reason);
- }
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
+
}
evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Dec 19 02:51:39 2006
@@ -26,7 +26,8 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Dec 19 02:51:39 2006
@@ -110,7 +110,7 @@
}
else
{
- throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+ throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Tue Dec 19 02:51:39 2006
@@ -59,7 +59,7 @@
// once more testing of the performance of the simple allocator has been done
if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
{
- _logger.warn("Using SimpleByteBufferAllocator");
+ _logger.info("Using SimpleByteBufferAllocator");
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Tue Dec 19 02:51:39 2006
@@ -40,11 +40,15 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
- protected final Logger _logger = Logger.getLogger(getClass());
+ protected final Logger _logger = Logger.getLogger(PropertiesFileInitialContextFactory.class);
private String CONNECTION_FACTORY_PREFIX = "connectionfactory.";
private String DESTINATION_PREFIX = "destination.";
@@ -54,6 +58,41 @@
public Context getInitialContext(Hashtable environment) throws NamingException
{
Map data = new ConcurrentHashMap();
+
+ try
+ {
+
+ String file = null;
+ if (environment.contains(Context.PROVIDER_URL))
+ {
+ file = (String) environment.get(Context.PROVIDER_URL);
+ }
+ else
+ {
+ file = System.getProperty(Context.PROVIDER_URL);
+ }
+
+ if (file != null)
+ {
+ _logger.info("Loading Properties from:" + file);
+ //Load the properties specified
+ Properties p = new Properties();
+
+ p.load(new BufferedInputStream(new FileInputStream(file)));
+
+ environment.putAll(p);
+ _logger.info("Loaded Context Properties:" + environment.toString());
+ }
+ else
+ {
+ _logger.warn("No Provider URL specified.");
+ }
+ }
+ catch (IOException ioe)
+ {
+ _logger.warn("Unable to load property file specified in Provider_URL:" +
+ environment.get(Context.PROVIDER_URL));
+ }
createConnectionFactories(data, environment);
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java Tue Dec 19 02:51:39 2006
@@ -23,6 +23,8 @@
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -80,6 +82,11 @@
public int getWeight()
{
return ClusteredSubscriptionManager.this.getWeight();
+ }
+
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
}
public boolean hasActiveSubscribers()
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java Tue Dec 19 02:51:39 2006
@@ -21,12 +21,12 @@
package org.apache.qpid.server.queue;
import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Distributes messages among a list of subsscription managers, using their
* weighting.
- *
*/
class NestedSubscriptionManager implements SubscriptionManager
{
@@ -44,11 +44,24 @@
_subscribers.remove(s);
}
+
+ public List<Subscription> getSubscriptions()
+ {
+ List<Subscription> allSubs = new LinkedList<Subscription>();
+
+ for (WeightedSubscriptionManager subMans : _subscribers)
+ {
+ allSubs.addAll(subMans.getSubscriptions());
+ }
+
+ return allSubs;
+ }
+
public boolean hasActiveSubscribers()
{
- for(WeightedSubscriptionManager s : _subscribers)
+ for (WeightedSubscriptionManager s : _subscribers)
{
- if(s.hasActiveSubscribers())
+ if (s.hasActiveSubscribers())
{
return true;
}
@@ -59,9 +72,9 @@
public Subscription nextSubscriber(AMQMessage msg)
{
WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ for (WeightedSubscriptionManager s = start; s != null; s = next(start))
{
- if(hasMore(s))
+ if (hasMore(s))
{
return nextSubscriber(s);
}
@@ -94,7 +107,7 @@
private WeightedSubscriptionManager next()
{
_iterations = 0;
- if(++_index >= _subscribers.size())
+ if (++_index >= _subscribers.size())
{
_index = 0;
}
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Tue Dec 19 02:51:39 2006
@@ -25,6 +25,9 @@
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+import java.util.List;
+
class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
{
private final GroupManager _groupMgr;
@@ -76,6 +79,11 @@
return _count;
}
+ public List<Subscription> getSubscriptions()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean hasActiveSubscribers()
{
return getWeight() == 0;
@@ -88,9 +96,34 @@
public void queueDeleted(AMQQueue queue)
{
- if(queue instanceof ClusteredQueue)
+ if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
}
+ }
+
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+ }
+
+ public void sendNextMessage(AMQQueue queue)
+ {
+
}
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties?view=auto&rev=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties Tue Dec 19 02:51:39 2006
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java Tue Dec 19 02:51:39 2006
@@ -483,7 +483,7 @@
{
return _properties.containsKey(name) && (_properties.get(name) == null) &&
_propertyNamesTypeMap.get(name).equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX);
-
+
}
@@ -606,7 +606,8 @@
// AMQ start character
if (!(Character.isLetter(propertyName.charAt(0))
|| propertyName.charAt(0) == '$'
- || propertyName.charAt(0) == '#'))
+ || propertyName.charAt(0) == '#'
+ || propertyName.charAt(0) == '_')) // Not official AMQP added for JMS.
{
throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character");
}
@@ -1156,9 +1157,9 @@
if (type == null)
{
String msg = "Field '" + key + "' - unsupported field table type: " + type + ".";
- //some extra trace information...
- msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
- throw new AMQFrameDecodingException(msg);
+ //some extra trace information...
+ msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
+ throw new AMQFrameDecodingException(msg);
}
Object value;
@@ -1203,7 +1204,7 @@
value = EncodingUtils.readBytes(buffer);
break;
default:
- String msg = "Internal error, the following type identifier is not handled: " + type;
+ String msg = "Internal error, the following type identifier is not handled: " + type;
throw new AMQFrameDecodingException(msg);
}