You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2006/12/19 11:51:42 UTC

svn commit: r488624 [1/2] - in /incubator/qpid/trunk/qpid: java/broker/ java/broker/src/main/grammar/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/filter/ java/broker/src/main/java/org/apache/qpid/s...

Author: ritchiem
Date: Tue Dec 19 02:51:39 2006
New Revision: 488624

URL: http://svn.apache.org/viewvc?view=rev&rev=488624
Log:
QPID-21
Added:
SelectorParser.jj - ActiveMQ selector javacc grammar used to generate SelectorParser.java
server/filter - Selector Filtering code from ActiveMQ project adjusted to suite our class and package structure.
server/message - Decorator classes to allow access to the JMSMessage inside the AMQMessage
ConcurrentSelectorDeliveryManager.java - A new DeliveryManager that utilises PreDeliveryQueues to implement selectors
AMQInvalidSelectorException.java - thrown on client and broker when the Selector text is invalid.
Common: log4j.properties to remove error log4j warnings on Common tests.

Modified:
broker/pom.xml - to generate SelectorParser.java
AMQChannel.java - Addition of argument fieldtable for filter setup.
BasicConsumeMethodHandler.java - writing of InvalidSelector channel close exception.
AMQMessage.java - Added decorator to get access to the enclosed JMSMessage
AMQQueue.java - Enhanced 'deliverymanager' property to allow the selection of the ConcurrentSelectorDeliveryManager.
Subscription.java - Enhanced interface to allow a subscription to state an 'interest' in a given message.
SubscriptionFactory.java - Added method to allow passing of filter arguments.
SubscriptionImpl.java - Implemented new Subscription.java methods.
SubscriptionManager.java - Added ability to get a list of current subscribers.
SubscriptionSet.java - augmented nextSubscriber to allow the subscriber to exert the new hasInterest feature.
SynchronizedDeliveryManager.java - fixed Logging class
AMQSession - Added filter extraction from consume call and pass it on to the registration.
ChannelCloseMethodHandler.java - Handle the reception and correct raising of the InvalidSelector Exception
AbstractJMSMessage.java - Expanded imports
BlockingMethodFrameListener.java - added extra info to a debug output line.
SocketTransportConnection.java - made output an info not a warn.
PropertiesFileInitialContextFactory.java - updated to allow the PROVIDER_URL to specify a property file to read in for the initial values.
ClusteredSubscriptionManager.java - Implementation of SubscriptionSet.java
NestedSubscriptionManager.java - Implementation of SubscriptionManager.java
RemoteSubscriptionImpl.java - Implementation Subscription.java
AMQConstant.java - Added '322' "Invalid Selector"
SubscriptionTestHelper.java - Implementation of Subscription.java

Edited specs/amqp-8.0.xml to add field table to consume method.

Thanks to the ActiveMQ project for writing the initial SelectorParser.jj and associated filter Expressions.

Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/grammar/
      - copied from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/grammar/
    incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/grammar/SelectorParser.jj
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/
      - copied from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
      - copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
      - copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
      - copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
      - copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/
      - copied from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/
      - copied from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
      - copied, changed from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties   (with props)
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
      - copied unchanged from r488302, incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/pom.xml
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
    incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
    incubator/qpid/trunk/qpid/specs/amqp-8.0.xml

Modified: incubator/qpid/trunk/qpid/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/pom.xml?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/pom.xml Tue Dec 19 02:51:39 2006
@@ -55,6 +55,10 @@
             <artifactId>commons-lang</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.mina</groupId>
             <artifactId>mina-filter-ssl</artifactId>
         </dependency>
@@ -82,6 +86,25 @@
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>javacc-maven-plugin</artifactId>
+                <version>2.0</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+                            <outputDirectory>${basedir}/target/generated</outputDirectory>
+                            <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+                        </configuration>
+                        <goals>
+                            <goal>javacc</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Dec 19 02:51:39 2006
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -290,7 +291,7 @@
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
-    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -301,7 +302,7 @@
             throw new ConsumerTagNotUniqueException();
         }
 
-        queue.registerProtocolSession(session, _channelId, tag, acks);
+        queue.registerProtocolSession(session, _channelId, tag, acks, filters);
         _consumerTag2QueueMap.put(tag, queue);
         return tag;
     }

Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java Tue Dec 19 02:51:39 2006
@@ -32,4 +32,6 @@
     void remove(MessageFilter filter);
 
     boolean allAllow(AMQMessage msg);
+
+    boolean hasFilters();
 }

Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Tue Dec 19 02:51:39 2006
@@ -53,16 +53,29 @@
                 _logger.info("filter:" + key);
                 if (key.equals(JMS_SELECTOR_FILTER))
                 {
-                    manager.add(new JMSSelectorFilter((String) filters.get(key)));
+                    String selector = (String) filters.get(key);
+
+                    if (selector != null && !selector.equals(""))
+                    {
+                        manager.add(new JMSSelectorFilter(selector));
+                    }
                 }
 
             }
+
+            //If we added no filters don't bear the overhead of having an filter manager
+            if (!manager.hasFilters())
+            {
+                manager = null;
+            }
         }
         else
         {
             _logger.info("No Filters found.");
         }
 
+
         return manager;
+
     }
 }

Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Tue Dec 19 02:51:39 2006
@@ -22,22 +22,17 @@
 
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.filter.jms.selector.SelectorParser;
-import org.apache.qpid.server.message.jms.JMSMessage;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidSelectorException;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.log4j.Logger;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
 public class JMSSelectorFilter implements MessageFilter
 {
-
     private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
-// LoggerFactory.getLogger(JMSSelectorFilter.class);
 
     private String _selector;
     private BooleanExpression _matcher;
@@ -47,7 +42,6 @@
         _selector = selector;
         _logger.info("Created JMSSelectorFilter with selector:" + _selector);
 
-        // BooleanExpression activemqSelctor = new ActiveMQSelectorParser(selector);
 
         try
         {

Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Tue Dec 19 02:51:39 2006
@@ -69,4 +69,9 @@
         }
         return true;
     }
+
+    public boolean hasFilters()
+    {
+        return !_filters.isEmpty();
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Dec 19 02:51:39 2006
@@ -21,10 +21,12 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.BasicConsumeOkBody;
 import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.log4j.Logger;
@@ -68,14 +71,14 @@
         {
             AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
 
-            if(queue == null)
+            if (queue == null)
             {
                 _log.info("No queue for '" + body.queue + "'");
             }
             try
             {
-                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue,  session, !body.noAck);
-                if(!body.nowait)
+                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments);
+                if (!body.nowait)
                 {
                     session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
                 }
@@ -83,10 +86,19 @@
                 //now allow queue to start async processing of any backlog of messages
                 queue.deliverAsync();
             }
-            catch(ConsumerTagNotUniqueException e)
+            catch (AMQInvalidSelectorException ise)
+            {
+                _log.info("Closing connection due to invalid selector");
+                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+                                                                      ise.getMessage(), BasicConsumeBody.CLASS_ID,
+                                                                      BasicConsumeBody.METHOD_ID));
+            }
+            catch (ConsumerTagNotUniqueException e)
             {
                 String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
-                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+                                                                      BasicConsumeBody.CLASS_ID,
+                                                                      BasicConsumeBody.METHOD_ID));
             }
         }
     }

Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java (from r488302, incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java?view=diff&rev=488624&p1=incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java&r1=488302&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java&r2=488624
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java Tue Dec 19 02:51:39 2006
@@ -31,12 +31,11 @@
 import javax.jms.MessageNotWriteableException;
 import java.util.Enumeration;
 
-public class JMSMessage implements MessageDecorator 
+public class JMSMessage implements MessageDecorator
 {
 
     private AMQMessage _message;
     private BasicContentHeaderProperties _properties;
-    private Destination _replyTo;
 
     public JMSMessage(AMQMessage message)
     {
@@ -45,13 +44,13 @@
         _properties = (BasicContentHeaderProperties) contentHeader.properties;
     }
 
-    protected void checkWriteable()
+    protected void checkWriteable() throws MessageNotWriteableException
     {
-//      The broker should not modify a message.
+        //The broker should not modify a message.
 //        if (_readableMessage)
-//        {
-//        throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
-//        }
+        {
+          throw new MessageNotWriteableException("The broker should not modify a message.");
+        }
     }
 
 
@@ -60,9 +59,10 @@
         return _properties.getMessageId();
     }
 
-    public void setJMSMessageID(String string)
+    public void setJMSMessageID(String string) throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.setMessageId(string);
     }
 
     public long getJMSTimestamp()
@@ -70,9 +70,10 @@
         return _properties.getTimestamp();
     }
 
-    public void setJMSTimestamp(long l)
+    public void setJMSTimestamp(long l) throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.setTimestamp(l);
     }
 
     public byte[] getJMSCorrelationIDAsBytes()
@@ -80,14 +81,14 @@
         return _properties.getCorrelationId().getBytes();
     }
 
-    public void setJMSCorrelationIDAsBytes(byte[] bytes)
-    {
-        checkWriteable();
-    }
+//    public void setJMSCorrelationIDAsBytes(byte[] bytes)
+//    {
+//    }
 
-    public void setJMSCorrelationID(String string)
+    public void setJMSCorrelationID(String string) throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.setCorrelationId(string);
     }
 
     public String getJMSCorrelationID()
@@ -100,20 +101,22 @@
         return _properties.getReplyTo();
     }
 
-    public void setJMSReplyTo(Destination destination)
+    public void setJMSReplyTo(Destination destination) throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.setReplyTo(destination.toString());
     }
 
     public String getJMSDestination()
     {
-        //FIXME Currently the Destination has not been defined. 
+        //fixme should be a deestination
         return "";
     }
 
-    public void setJMSDestination(Destination destination)
+    public void setJMSDestination(Destination destination) throws MessageNotWriteableException
     {
         checkWriteable();
+        //_properties.setDestination(destination.toString());
     }
 
     public int getJMSDeliveryMode()
@@ -121,9 +124,10 @@
         return _properties.getDeliveryMode();
     }
 
-    public void setJMSDeliveryMode(int i)
+    public void setJMSDeliveryMode(byte i) throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.setDeliveryMode(i);
     }
 
     public boolean getJMSRedelivered()
@@ -131,9 +135,10 @@
         return _message.isRedelivered();
     }
 
-    public void setJMSRedelivered(boolean b)
+    public void setJMSRedelivered(boolean b) throws MessageNotWriteableException
     {
         checkWriteable();
+        _message.setRedelivered(b);
     }
 
     public String getJMSType()
@@ -141,9 +146,10 @@
         return _properties.getType();
     }
 
-    public void setJMSType(String string)
+    public void setJMSType(String string) throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.setType(string);
     }
 
     public long getJMSExpiration()
@@ -151,9 +157,10 @@
         return _properties.getExpiration();
     }
 
-    public void setJMSExpiration(long l)
+    public void setJMSExpiration(long l) throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.setExpiration(l);
     }
 
     public int getJMSPriority()
@@ -161,122 +168,133 @@
         return _properties.getPriority();
     }
 
-    public void setJMSPriority(int i)
+    public void setJMSPriority(byte i) throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.setPriority(i);
     }
 
-    public void clearProperties()
+    public void clearProperties() throws MessageNotWriteableException
     {
         checkWriteable();
+        _properties.getJMSHeaders().clear();
     }
 
     public boolean propertyExists(String string)
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().propertyExists(string);
     }
 
-    public boolean getBooleanProperty(String string)
+    public boolean getBooleanProperty(String string) throws JMSException
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getBoolean(string);
     }
 
-    public byte getByteProperty(String string)
+    public byte getByteProperty(String string) throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getByte(string);
     }
 
-    public short getShortProperty(String string)
+    public short getShortProperty(String string) throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getShort(string);
     }
 
-    public int getIntProperty(String string)
+    public int getIntProperty(String string) throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getInteger(string);
     }
 
-    public long getLongProperty(String string)
+    public long getLongProperty(String string) throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getLong(string);
     }
 
-    public float getFloatProperty(String string)
+    public float getFloatProperty(String string) throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getFloat(string);
     }
 
-    public double getDoubleProperty(String string)
+    public double getDoubleProperty(String string) throws JMSException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getDouble(string);
     }
 
-    public String getStringProperty(String string)
+    public String getStringProperty(String string) throws JMSException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getString(string);
     }
 
-    public Object getObjectProperty(String string)
+    public Object getObjectProperty(String string) throws JMSException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getObject(string);
     }
 
     public Enumeration getPropertyNames()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return _properties.getJMSHeaders().getPropertyNames();
     }
 
-    public void setBooleanProperty(String string, boolean b)
+    public void setBooleanProperty(String string, boolean b) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setBoolean(string, b);
     }
 
-    public void setByteProperty(String string, byte b)
+    public void setByteProperty(String string, byte b) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setByte(string, b);
     }
 
-    public void setShortProperty(String string, short i)
+    public void setShortProperty(String string, short i) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setShort(string, i);
     }
 
-    public void setIntProperty(String string, int i)
+    public void setIntProperty(String string, int i) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setInteger(string, i);
     }
 
-    public void setLongProperty(String string, long l)
+    public void setLongProperty(String string, long l) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setLong(string, l);
     }
 
-    public void setFloatProperty(String string, float v)
+    public void setFloatProperty(String string, float v) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setFloat(string, v);
     }
 
-    public void setDoubleProperty(String string, double v)
+    public void setDoubleProperty(String string, double v) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setDouble(string, v);
     }
 
-    public void setStringProperty(String string, String string1)
+    public void setStringProperty(String string, String string1) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setString(string, string1);
     }
 
-    public void setObjectProperty(String string, Object object)
+    public void setObjectProperty(String string, Object object) throws JMSException
     {
         checkWriteable();
+        _properties.getJMSHeaders().setObject(string, object);
     }
 
-    public void acknowledge()
+    public void acknowledge() throws MessageNotWriteableException
     {
         checkWriteable();
     }
 
-    public void clearBody()
+    public void clearBody() throws MessageNotWriteableException
     {
         checkWriteable();
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Dec 19 02:51:39 2006
@@ -25,6 +25,8 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
 import org.apache.qpid.AMQException;
 
 import java.util.ArrayList;
@@ -33,17 +35,21 @@
 import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Combines the information that make up a deliverable message into a more manageable form.
  */
 public class AMQMessage
 {
+    public static final String JMS_MESSAGE = "jms.message";
+
     private final Set<Object> _tokens = new HashSet<Object>();
 
     private AMQProtocolSession _publisher;
 
-    private final BasicPublishBody  _publishBody;
+    private final BasicPublishBody _publishBody;
 
     private ContentHeaderBody _contentHeaderBody;
 
@@ -83,6 +89,8 @@
      * messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
+    private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+    private AtomicBoolean _taken;
 
 
     public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -96,7 +104,9 @@
         _publishBody = publishBody;
         _store = messageStore;
         _contentBodies = new LinkedList<ContentBody>();
+        _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
         _storeWhenComplete = storeWhenComplete;
+        _taken = new AtomicBoolean(false);
     }
 
     public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
@@ -107,6 +117,7 @@
         _publishBody = publishBody;
         _contentHeaderBody = contentHeaderBody;
         _contentBodies = contentBodies;
+        _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
         _messageId = messageId;
         _store = store;
         storeMessage();
@@ -271,7 +282,7 @@
             {
                 _store.removeMessage(_messageId);
             }
-            catch(AMQException e)
+            catch (AMQException e)
             {
                 //to maintain consistency, we revert the count
                 incrementReference();
@@ -292,7 +303,7 @@
 
     public boolean checkToken(Object token)
     {
-        if(_tokens.contains(token))
+        if (_tokens.contains(token))
         {
             return true;
         }
@@ -308,7 +319,7 @@
         //if the message is not persistent or the queue is not durable
         //we will not need to recover the association and so do not
         //need to record it
-        if(isPersistent() && queue.isDurable())
+        if (isPersistent() && queue.isDurable())
         {
             _store.enqueueMessage(queue.getName(), _messageId);
         }
@@ -318,7 +329,7 @@
     {
         //only record associations where both queue and message will survive
         //a restart, so only need to remove association if this is the case
-        if(isPersistent() && queue.isDurable())
+        if (isPersistent() && queue.isDurable())
         {
             _store.dequeueMessage(queue.getName(), _messageId);
         }
@@ -326,14 +337,14 @@
 
     public boolean isPersistent() throws AMQException
     {
-        if(_contentHeaderBody == null)
+        if (_contentHeaderBody == null)
         {
             throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
         }
 
         //todo remove literal values to a constant file such as AMQConstants in common
         return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
-                &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+               && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
     }
 
     public void setTxnBuffer(TxnBuffer buffer)
@@ -352,8 +363,9 @@
      * immediate delivery but has not been marked as delivered to a
      * consumer
      */
-    public void checkDeliveredToConsumer() throws NoConsumersException{
-        if(isImmediate() && !_deliveredToConsumer)
+    public void checkDeliveredToConsumer() throws NoConsumersException
+    {
+        if (isImmediate() && !_deliveredToConsumer)
         {
             throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
         }
@@ -362,8 +374,64 @@
     /**
      * Called when this message is delivered to a consumer. (used to
      * implement the 'immediate' flag functionality).
+     * And by selectors to determin if the message has already been sent
      */
-    public void setDeliveredToConsumer(){
+    public void setDeliveredToConsumer()
+    {
         _deliveredToConsumer = true;
+    }
+
+    /**
+     * Called selectors to determin if the message has already been sent
+     * @return   _deliveredToConsumer
+     */
+    public boolean getDeliveredToConsumer()
+    {
+        return _deliveredToConsumer;
+    }
+
+
+    public MessageDecorator getDecodedMessage(String type)
+    {
+        MessageDecorator msgtype = null;
+
+        if (_decodedMessages != null)
+        {
+            msgtype = _decodedMessages.get(type);
+
+            if (msgtype == null)
+            {
+                msgtype = decorateMessage(type);
+            }
+        }
+
+        return msgtype;
+    }
+
+    private MessageDecorator decorateMessage(String type)
+    {
+        MessageDecorator msgdec = null;
+
+        if (type.equals(JMS_MESSAGE))
+        {
+            msgdec = new JMSMessage(this);
+        }
+
+        if (msgdec != null)
+        {
+            _decodedMessages.put(type, msgdec);
+        }
+
+        return msgdec;
+    }
+
+    public boolean taken()
+    {
+        return _taken.getAndSet(true);
+    }
+
+    public void release()
+    {
+        _taken.set(false);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Dec 19 02:51:39 2006
@@ -22,6 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -187,16 +188,29 @@
         _subscribers = subscribers;
         _subscriptionFactory = subscriptionFactory;
 
-        //fixme - Pick one.
-        if (Boolean.getBoolean("concurrentdeliverymanager"))
+        //fixme - Make this configurable via the broker config.xml
+        if (System.getProperties().getProperty("deliverymanager") != null)
         {
-            _logger.info("Using ConcurrentDeliveryManager");
-            _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+            if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+            {
+                _logger.info("Using ConcurrentSelectorDeliveryManager");
+                _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+            }
+            else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+            {
+                _logger.info("Using ConcurrentDeliveryManager");
+                _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+            }
+            else
+            {
+                _logger.info("Using SynchronizedDeliveryManager");
+                _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+            }
         }
         else
         {
-            _logger.info("Using SynchronizedDeliveryManager");
-            _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+            _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
+            _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
         }
     }
 
@@ -348,12 +362,12 @@
         _bindings.addBinding(routingKey, exchange);
     }
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters)
             throws AMQException
     {
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
-        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters);
         _subscribers.addSubscriber(subscription);
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Tue Dec 19 02:51:39 2006
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.AMQException;
+
+import java.util.Queue;
+
 public interface Subscription
 {
     void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
@@ -27,4 +31,13 @@
     boolean isSuspended();
 
     void queueDeleted(AMQQueue queue);
+
+    boolean hasFilters();
+
+    boolean hasInterest(AMQMessage msg);
+
+    Queue<AMQMessage> getPreDeliveryQueue();
+
+    void enqueueForPreDelivery(AMQMessage msg);
+
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Tue Dec 19 02:51:39 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
 
 /**
  * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,6 +33,9 @@
  */
 public interface SubscriptionFactory
 {
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters)
+        throws AMQException;
+
     Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
         throws AMQException;
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Dec 19 02:51:39 2006
@@ -23,12 +23,18 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 
+import java.util.Queue;
+
 /**
  * Encapsulation of a supscription to a queue.
  * <p/>
@@ -48,23 +54,32 @@
 
     private final Object sessionKey;
 
+    private Queue<AMQMessage> _messages;
+
+
     /**
      * True if messages need to be acknowledged
      */
     private final boolean _acks;
+    private FilterManager _filters;
 
     public static class Factory implements SubscriptionFactory
     {
+        public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+        {
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
+        }
+
         public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
         }
 
         public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
         }
     }
 
@@ -72,6 +87,13 @@
                             String consumerTag, boolean acks)
             throws AMQException
     {
+        this(channelId, protocolSession, consumerTag, acks, null);
+    }
+
+    public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+                            String consumerTag, boolean acks, FieldTable filters)
+            throws AMQException
+    {
         AMQChannel channel = protocolSession.getChannel(channelId);
         if (channel == null)
         {
@@ -83,6 +105,17 @@
         this.consumerTag = consumerTag;
         sessionKey = protocolSession.getKey();
         _acks = acks;
+        _filters = FilterManagerFactory.createManager(filters);
+
+        if (_filters != null)
+        {
+            _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+        }
+        else
+        {
+            // Reference the DeliveryManager
+            _messages = null;
+        }
     }
 
     public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
@@ -131,7 +164,7 @@
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately. 
-            
+
             // By doing this _before_ the send we ensure that it
             // doesn't get sent if it can't be dequeued, preventing
             // duplicate delivery on recovery.
@@ -177,6 +210,32 @@
     {
         channel.queueDeleted(queue);
     }
+
+    public boolean hasFilters()
+    {
+        return _filters != null;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        return _filters.allAllow(msg);
+    }
+
+    public Queue<AMQMessage> getPreDeliveryQueue()
+    {
+        return _messages;
+    }
+
+    public void enqueueForPreDelivery(AMQMessage msg)
+    {
+        if (_messages != null)
+        {
+            _messages.offer(msg);
+        }
+    }
+
+
+
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
     {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Dec 19 02:51:39 2006
@@ -20,12 +20,15 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.List;
+
 /**
  * Abstraction of actor that will determine the subscriber to whom
  * a message will be sent.
  */
 public interface SubscriptionManager
 {
+    public List<Subscription> getSubscriptions();
     public boolean hasActiveSubscribers();
     public Subscription nextSubscriber(AMQMessage msg);
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Dec 19 02:51:39 2006
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
 import java.util.List;
 import java.util.ListIterator;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@
 
     /**
      * Remove the subscription, returning it if it was found
+     *
      * @param subscription
      * @return null if no match was found
      */
@@ -90,7 +93,7 @@
 
     /**
      * Return the next unsuspended subscription or null if not found.
-     *
+     * <p/>
      * Performance note:
      * This method can scan all items twice when looking for a subscription that is not
      * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,31 +108,58 @@
             return null;
         }
 
-        try {
-            final Subscription result = nextSubscriber();
-            if (result == null) {
+        try
+        {
+            final Subscription result = nextSubscriberImpl(msg);
+            if (result == null)
+            {
                 _currentSubscriber = 0;
-                return nextSubscriber();
-            } else {
+                return nextSubscriberImpl(msg);
+            }
+            else
+            {
                 return result;
             }
-        } catch (IndexOutOfBoundsException e) {
+        }
+        catch (IndexOutOfBoundsException e)
+        {
             _currentSubscriber = 0;
-            return nextSubscriber();
+            return nextSubscriber(msg);
         }
     }
 
-    private Subscription nextSubscriber()
+    private Subscription nextSubscriberImpl(AMQMessage msg)
     {
         final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
-        while (iterator.hasNext()) {
+        while (iterator.hasNext())
+        {
             Subscription subscription = iterator.next();
             ++_currentSubscriber;
             subscriberScanned();
-            if (!subscription.isSuspended()) {
-                return subscription;
+
+            if (!subscription.isSuspended())
+            {
+                if (!subscription.hasFilters())
+                {
+                    return subscription;
+                }
+                else
+                {
+                    if (subscription.hasInterest(msg))
+                    {
+                        // if the queue is not empty then this client is ready to receive a message.
+                        //FIXME the queue could be full of sent messages.
+                        // Either need to clean all PDQs after sending a message
+                        // OR have a clean up thread that runs the PDQs expunging the messages.
+                        if (subscription.getPreDeliveryQueue().isEmpty())
+                        {
+                            return subscription;
+                        }
+                    }
+                }
             }
         }
+
         return null;
     }
 
@@ -145,11 +175,19 @@
         return _subscriptions.isEmpty();
     }
 
+    public List<Subscription> getSubscriptions()
+    {
+        return _subscriptions;
+    }
+
     public boolean hasActiveSubscribers()
     {
         for (Subscription s : _subscriptions)
         {
-            if (!s.isSuspended()) return true;
+            if (!s.isSuspended())
+            {
+                return true;
+            }
         }
         return false;
     }
@@ -159,7 +197,10 @@
         int count = 0;
         for (Subscription s : _subscriptions)
         {
-            if (!s.isSuspended()) count++;
+            if (!s.isSuspended())
+            {
+                count++;
+            }
         }
         return count;
     }
@@ -177,7 +218,8 @@
         }
     }
 
-    int size() {
+    int size()
+    {
         return _subscriptions.size();
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Tue Dec 19 02:51:39 2006
@@ -35,7 +35,7 @@
  */
 class SynchronizedDeliveryManager implements DeliveryManager
 {
-    private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+    private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
 
     /**
      * Holds any queued messages

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Dec 19 02:51:39 2006
@@ -23,11 +23,13 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
@@ -49,6 +51,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
@@ -176,7 +179,7 @@
         {
             if (message.deliverBody != null)
             {
-                final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
+                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
 
                 if (consumer == null)
                 {
@@ -210,17 +213,15 @@
                     {
                         _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
                     }
+                    else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+                    {
+                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+                    }
                     else
                     {
-                        if (errorCode == AMQConstant.NO_ROUTE.getCode())
-                        {
-                            _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
-                        }
-                        else
-                        {
-                            _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
-                        }
+                        _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
                     }
+
                 }
                 catch (Exception e)
                 {
@@ -734,7 +735,6 @@
     }
 
 
-
     public MessageListener getMessageListener() throws JMSException
     {
         checkNotClosed();
@@ -954,6 +954,12 @@
                 {
                     registerConsumer(consumer, false);
                 }
+                catch (AMQInvalidSelectorException ise)
+                {
+                    JMSException ex = new InvalidSelectorException(ise.getMessage());
+                    ex.setLinkedException(ise);
+                    throw ex;
+                }
                 catch (AMQException e)
                 {
                     JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -963,7 +969,7 @@
 
                 synchronized(destination)
                 {
-                    _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+                    _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
                     _destinationConsumerCount.get(destination).incrementAndGet();
                 }
 
@@ -975,16 +981,16 @@
     private void checkTemporaryDestination(Destination destination)
             throws JMSException
     {
-        if((destination instanceof TemporaryDestination))
+        if ((destination instanceof TemporaryDestination))
         {
             _logger.debug("destination is temporary");
             final TemporaryDestination tempDest = (TemporaryDestination) destination;
-            if(tempDest.getSession() != this)
+            if (tempDest.getSession() != this)
             {
                 _logger.debug("destination is on different session");
                 throw new JMSException("Cannot consume from a temporary destination created onanother session");
             }
-            if(tempDest.isDeleted())
+            if (tempDest.isDeleted())
             {
                 _logger.debug("destination is deleted");
                 throw new JMSException("Cannot consume from a deleted destination");
@@ -1065,12 +1071,19 @@
      * @return the consumer tag generated by the broker
      */
     private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
-                                  boolean nowait) throws AMQException
+                                  boolean nowait, String messageSelector) throws AMQException
     {
         //fixme prefetch values are not used here. Do we need to have them as parametsrs?
         //need to generate a consumer tag on the client so we can exploit the nowait flag
         String tag = Integer.toString(_nextTag++);
 
+        FieldTable arguments = FieldTableFactory.newFieldTable();
+        if (messageSelector != null)
+        {
+            //fixme move literal value to a common class.
+            arguments.put("x-filter-jms-selector", messageSelector);
+        }
+
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tag, consumer);
@@ -1080,7 +1093,7 @@
             AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
                                                                   queueName, tag, consumer.isNoLocal(),
                                                                   consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
-                                                                  consumer.isExclusive(), nowait);
+                                                                  consumer.isExclusive(), nowait, arguments);
             if (nowait)
             {
                 protocolHandler.writeFrame(jmsConsume);
@@ -1220,7 +1233,7 @@
     {
         checkNotClosed();
         checkValidTopic(topic);
-        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
         TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
         if (subscriber != null)
         {
@@ -1247,8 +1260,8 @@
 
         subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
 
-        _subscriptions.put(name,subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
         return subscriber;
     }
@@ -1278,8 +1291,8 @@
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
         TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
-        _subscriptions.put(name,subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
         return subscriber;
     }
 
@@ -1476,7 +1489,14 @@
 
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
-        consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+        try
+        {
+            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+        }
+        catch (JMSException e) //thrown by getMessageSelector
+        {
+            throw new AMQException(e.getMessage(), e);
+        }
     }
 
     /**
@@ -1489,7 +1509,7 @@
     {
         _consumers.remove(consumer.getConsumerTag());
         String subscriptionName = _reverseSubscriptionMap.remove(consumer);
-        if(subscriptionName != null)
+        if (subscriptionName != null)
         {
             _subscriptions.remove(subscriptionName);
         }
@@ -1497,7 +1517,7 @@
         Destination dest = consumer.getDestination();
         synchronized(dest)
         {
-            if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+            if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
             {
                 _destinationConsumerCount.remove(dest);
             }
@@ -1576,7 +1596,7 @@
         {
             throw new javax.jms.InvalidDestinationException("Invalid Topic");
         }
-        if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+        if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
         {
             throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
         }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Dec 19 02:51:39 2006
@@ -23,6 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQChannelClosedException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -46,7 +47,7 @@
 
     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
-         _logger.debug("ChannelClose method received");
+        _logger.debug("ChannelClose method received");
         ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
 
         int errorCode = method.replyCode;
@@ -65,17 +66,21 @@
             {
                 throw new AMQNoConsumersException("Error: " + reason, null);
             }
+            else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+            {
+                throw new AMQNoRouteException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+            {
+                _logger.info("Broker responded with Invalid Selector.");
+
+                throw new AMQInvalidSelectorException(reason);
+            }
             else
             {
-                if (errorCode == AMQConstant.NO_ROUTE.getCode())
-                {
-                   throw new AMQNoRouteException("Error: " + reason, null);
-                }
-                else
-                {
-                    throw new AMQChannelClosedException(errorCode, "Error: " + reason);
-                }
+                throw new AMQChannelClosedException(errorCode, "Error: " + reason);
             }
+
         }
         evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Dec 19 02:51:39 2006
@@ -26,7 +26,8 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Dec 19 02:51:39 2006
@@ -110,7 +110,7 @@
             }
             else
             {
-                throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+                throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
             }
         }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Tue Dec 19 02:51:39 2006
@@ -59,7 +59,7 @@
         // once more testing of the performance of the simple allocator has been done
         if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
         {
-            _logger.warn("Using SimpleByteBufferAllocator");
+            _logger.info("Using SimpleByteBufferAllocator");
             ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
         }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Tue Dec 19 02:51:39 2006
@@ -40,11 +40,15 @@
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
 
 public class PropertiesFileInitialContextFactory implements InitialContextFactory
 {
-    protected final Logger _logger = Logger.getLogger(getClass());
+    protected final Logger _logger = Logger.getLogger(PropertiesFileInitialContextFactory.class);
 
     private String CONNECTION_FACTORY_PREFIX = "connectionfactory.";
     private String DESTINATION_PREFIX = "destination.";
@@ -54,6 +58,41 @@
     public Context getInitialContext(Hashtable environment) throws NamingException
     {
         Map data = new ConcurrentHashMap();
+
+        try
+        {
+
+            String file = null;
+            if (environment.contains(Context.PROVIDER_URL))
+            {
+                file = (String) environment.get(Context.PROVIDER_URL);
+            }
+            else
+            {
+                file = System.getProperty(Context.PROVIDER_URL);
+            }
+
+            if (file != null)
+            {
+                _logger.info("Loading Properties from:" + file);
+                //Load the properties specified
+                Properties p = new Properties();
+
+                p.load(new BufferedInputStream(new FileInputStream(file)));
+
+                environment.putAll(p);
+                _logger.info("Loaded Context Properties:" + environment.toString());
+            }
+            else
+            {
+                _logger.warn("No Provider URL specified.");
+            }
+        }
+        catch (IOException ioe)
+        {
+            _logger.warn("Unable to load property file specified in Provider_URL:" +
+                         environment.get(Context.PROVIDER_URL));
+        }
 
         createConnectionFactories(data, environment);
 

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java Tue Dec 19 02:51:39 2006
@@ -23,6 +23,8 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.cluster.util.LogMessage;
 
+import java.util.List;
+
 class ClusteredSubscriptionManager extends SubscriptionSet
 {
     private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -80,6 +82,11 @@
         public int getWeight()
         {
             return ClusteredSubscriptionManager.this.getWeight();
+        }
+
+        public List<Subscription> getSubscriptions()
+        {
+            return ClusteredSubscriptionManager.super.getSubscriptions();
         }
 
         public boolean hasActiveSubscribers()

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java Tue Dec 19 02:51:39 2006
@@ -21,12 +21,12 @@
 package org.apache.qpid.server.queue;
 
 import java.util.List;
+import java.util.LinkedList;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * Distributes messages among a list of subsscription managers, using their
  * weighting.
- *
  */
 class NestedSubscriptionManager implements SubscriptionManager
 {
@@ -44,11 +44,24 @@
         _subscribers.remove(s);
     }
 
+
+    public List<Subscription> getSubscriptions()
+    {
+        List<Subscription> allSubs = new LinkedList<Subscription>();
+
+        for (WeightedSubscriptionManager subMans : _subscribers)
+        {
+            allSubs.addAll(subMans.getSubscriptions());
+        }
+
+        return allSubs;
+    }
+
     public boolean hasActiveSubscribers()
     {
-        for(WeightedSubscriptionManager s : _subscribers)
+        for (WeightedSubscriptionManager s : _subscribers)
         {
-            if(s.hasActiveSubscribers())
+            if (s.hasActiveSubscribers())
             {
                 return true;
             }
@@ -59,9 +72,9 @@
     public Subscription nextSubscriber(AMQMessage msg)
     {
         WeightedSubscriptionManager start = current();
-        for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+        for (WeightedSubscriptionManager s = start; s != null; s = next(start))
         {
-            if(hasMore(s))
+            if (hasMore(s))
             {
                 return nextSubscriber(s);
             }
@@ -94,7 +107,7 @@
     private WeightedSubscriptionManager next()
     {
         _iterations = 0;
-        if(++_index >= _subscribers.size())
+        if (++_index >= _subscribers.size())
         {
             _index = 0;
         }

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Tue Dec 19 02:51:39 2006
@@ -25,6 +25,9 @@
 import org.apache.qpid.server.cluster.SimpleSendable;
 import org.apache.qpid.AMQException;
 
+import java.util.Queue;
+import java.util.List;
+
 class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
 {
     private final GroupManager _groupMgr;
@@ -76,6 +79,11 @@
         return _count;
     }
 
+    public List<Subscription> getSubscriptions()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public boolean hasActiveSubscribers()
     {
         return getWeight() == 0;
@@ -88,9 +96,34 @@
 
     public void queueDeleted(AMQQueue queue)
     {
-        if(queue instanceof ClusteredQueue)
+        if (queue instanceof ClusteredQueue)
         {
             ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
         }
+    }
+
+    public boolean hasFilters()
+    {
+        return false;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        return true;
+    }
+
+    public Queue<AMQMessage> getPreDeliveryQueue()
+    {
+        return null;
+    }
+
+    public void enqueueForPreDelivery(AMQMessage msg)
+    {
+        //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+    }
+
+    public void sendNextMessage(AMQQueue queue)
+    {
+
     }
 }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties?view=auto&rev=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties Tue Dec 19 02:51:39 2006
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java?view=diff&rev=488624&r1=488623&r2=488624
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java Tue Dec 19 02:51:39 2006
@@ -483,7 +483,7 @@
     {
         return _properties.containsKey(name) && (_properties.get(name) == null) &&
                _propertyNamesTypeMap.get(name).equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX);
-                
+
 
     }
 
@@ -606,7 +606,8 @@
         // AMQ start character
         if (!(Character.isLetter(propertyName.charAt(0))
               || propertyName.charAt(0) == '$'
-              || propertyName.charAt(0) == '#'))
+              || propertyName.charAt(0) == '#'
+              || propertyName.charAt(0) == '_')) // Not official AMQP added for JMS.                
         {
             throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character");
         }
@@ -1156,9 +1157,9 @@
             if (type == null)
             {
                 String msg = "Field '" + key + "' - unsupported field table type: " + type + ".";
-                    //some extra trace information...
-                    msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
-                    throw new AMQFrameDecodingException(msg);
+                //some extra trace information...
+                msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
+                throw new AMQFrameDecodingException(msg);
             }
             Object value;
 
@@ -1203,7 +1204,7 @@
                     value = EncodingUtils.readBytes(buffer);
                     break;
                 default:
-                    String msg = "Internal error, the following type identifier is not handled: " + type;                                        
+                    String msg = "Internal error, the following type identifier is not handled: " + type;
                     throw new AMQFrameDecodingException(msg);
             }