You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2006/12/06 11:59:29 UTC

svn commit: r483025 - in /incubator/qpid/branches/jmsselectors: ./ java/ java/broker/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/handler/ java/broker/src/main/java/org/apache/qpid/server/queue/ ja...

Author: ritchiem
Date: Wed Dec  6 02:59:27 2006
New Revision: 483025

URL: http://svn.apache.org/viewvc?view=rev&rev=483025
Log:
Moved Selector work to a branch

Added:
    incubator/qpid/branches/jmsselectors/
      - copied from r480507, incubator/qpid/trunk/qpid/
    incubator/qpid/branches/jmsselectors/java/
      - copied from r482736, incubator/qpid/trunk/qpid/java/
    incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
      - copied, changed from r483021, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified:
    incubator/qpid/branches/jmsselectors/java/broker/pom.xml
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/branches/jmsselectors/java/client/pom.xml
    incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
    incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
    incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
    incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java
    incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java
    incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java
    incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/branches/jmsselectors/java/common/pom.xml
    incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
    incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
    incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl
    incubator/qpid/branches/jmsselectors/java/pom.xml
    incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
    incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
    incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml

Modified: incubator/qpid/branches/jmsselectors/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/pom.xml?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/pom.xml Wed Dec  6 02:59:27 2006
@@ -53,6 +53,10 @@
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>
+          <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.mina</groupId>
             <artifactId>mina-filter-ssl</artifactId>
@@ -81,6 +85,26 @@
 
     <build>
         <plugins>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>javacc-maven-plugin</artifactId>
+                <version>2.0</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+                            <outputDirectory>${basedir}/target/generated</outputDirectory>
+                            <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+                        </configuration>
+                        <goals>
+                            <goal>javacc</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Dec  6 02:59:27 2006
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -290,7 +291,7 @@
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
-    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable fitler) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -301,7 +302,8 @@
             throw new ConsumerTagNotUniqueException();
         }
 
-        queue.registerProtocolSession(session, _channelId, tag, acks);
+
+        queue.registerProtocolSession(session, _channelId, tag, acks,fitler);
         _consumerTag2QueueMap.put(tag, queue);
         return tag;
     }

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Wed Dec  6 02:59:27 2006
@@ -74,7 +74,7 @@
             }
             try
             {
-                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue,  session, !body.noAck);
+                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue,  session, !body.noAck, body.filter);
                 if(!body.nowait)
                 {
                     session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed Dec  6 02:59:27 2006
@@ -25,6 +25,8 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
 import org.apache.qpid.AMQException;
 
 import java.util.ArrayList;
@@ -33,17 +35,20 @@
 import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Combines the information that make up a deliverable message into a more manageable form.
  */
 public class AMQMessage
 {
+    public static final String JMS_MESSAGE = "jms.message";
+
     private final Set<Object> _tokens = new HashSet<Object>();
 
     private AMQProtocolSession _publisher;
 
-    private final BasicPublishBody  _publishBody;
+    private final BasicPublishBody _publishBody;
 
     private ContentHeaderBody _contentHeaderBody;
 
@@ -83,6 +88,7 @@
      * messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
+    private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
 
 
     public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -96,17 +102,19 @@
         _publishBody = publishBody;
         _store = messageStore;
         _contentBodies = new LinkedList<ContentBody>();
+        _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
         _storeWhenComplete = storeWhenComplete;
     }
 
     public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
                       ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
             throws AMQException
-    
+
     {
         _publishBody = publishBody;
         _contentHeaderBody = contentHeaderBody;
         _contentBodies = contentBodies;
+        _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
         _messageId = messageId;
         _store = store;
         storeMessage();
@@ -116,7 +124,7 @@
                       ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
             throws AMQException
     {
-        this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);        
+        this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
     }
 
     protected AMQMessage(AMQMessage msg) throws AMQException
@@ -270,7 +278,7 @@
             {
                 _store.removeMessage(_messageId);
             }
-            catch(AMQException e)
+            catch (AMQException e)
             {
                 //to maintain consistency, we revert the count
                 incrementReference();
@@ -291,7 +299,7 @@
 
     public boolean checkToken(Object token)
     {
-        if(_tokens.contains(token))
+        if (_tokens.contains(token))
         {
             return true;
         }
@@ -307,7 +315,7 @@
         //if the message is not persistent or the queue is not durable
         //we will not need to recover the association and so do not
         //need to record it
-        if(isPersistent() && queue.isDurable())
+        if (isPersistent() && queue.isDurable())
         {
             _store.enqueueMessage(queue.getName(), _messageId);
         }
@@ -317,7 +325,7 @@
     {
         //only record associations where both queue and message will survive
         //a restart, so only need to remove association if this is the case
-        if(isPersistent() && queue.isDurable())
+        if (isPersistent() && queue.isDurable())
         {
             _store.dequeueMessage(queue.getName(), _messageId);
         }
@@ -325,14 +333,14 @@
 
     public boolean isPersistent() throws AMQException
     {
-        if(_contentHeaderBody == null)
+        if (_contentHeaderBody == null)
         {
             throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
         }
 
         //todo remove literal values to a constant file such as AMQConstants in common
         return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
-                &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+               && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
     }
 
     public void setTxnBuffer(TxnBuffer buffer)
@@ -346,13 +354,15 @@
     }
 
     /**
-     * Called to enforce the 'immediate' flag. 
+     * Called to enforce the 'immediate' flag.
+     *
      * @throws NoConsumersException if the message is marked for
-     * immediate delivery but has not been marked as delivered to a
-     * consumer
+     *                              immediate delivery but has not been marked as delivered to a
+     *                              consumer
      */
-    public void checkDeliveredToConsumer() throws NoConsumersException{
-        if(isImmediate() && !_deliveredToConsumer)
+    public void checkDeliveredToConsumer() throws NoConsumersException
+    {
+        if (isImmediate() && !_deliveredToConsumer)
         {
             throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
         }
@@ -362,7 +372,43 @@
      * Called when this message is delivered to a consumer. (used to
      * implement the 'immediate' flag functionality).
      */
-    public void setDeliveredToConsumer(){
+    public void setDeliveredToConsumer()
+    {
         _deliveredToConsumer = true;
+    }
+
+
+    public MessageDecorator getDecodedMessage(String type)
+    {
+        MessageDecorator msgtype = null;
+
+        if (_decodedMessages != null)
+        {
+            msgtype = _decodedMessages.get(type);
+
+            if (msgtype == null)
+            {
+                msgtype = decorateMessage(type);
+            }
+        }
+
+        return msgtype;
+    }
+
+    private MessageDecorator decorateMessage(String type)
+    {
+        MessageDecorator msgdec = null;
+
+        if (type.equals(JMS_MESSAGE))
+        {
+            msgdec = new JMSMessage(this);
+        }
+
+        if (msgdec != null)
+        {
+            _decodedMessages.put(type, msgdec);
+        }
+
+        return msgdec;
     }
 }

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Dec  6 02:59:27 2006
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.MBeanConstructor;
@@ -141,8 +142,8 @@
 
         // OpenMBean data types for viewMessageContent method
         private CompositeType _msgContentType = null;
-        private String[]      _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
-        private OpenType[]    _msgContentAttributeTypes = new OpenType[4];
+        private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+        private OpenType[] _msgContentAttributeTypes = new OpenType[4];
 
         @MBeanConstructor("Creates an MBean exposing an AMQQueue")
         public AMQQueueMBean() throws JMException
@@ -162,14 +163,14 @@
             _msgContentAttributeTypes[2] = SimpleType.STRING;                  // For Encoding
             _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE);  // For message content
             _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
-                                                 _msgContentAttributes, _msgContentAttributeTypes);
+                                                _msgContentAttributes, _msgContentAttributeTypes);
 
             _msgAttributeTypes[0] = SimpleType.LONG;                      // For message id
             _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING);  // For header attributes
             _msgAttributeTypes[2] = SimpleType.LONG;                      // For size
             _msgAttributeTypes[3] = SimpleType.BOOLEAN;                   // For redelivered
 
-            _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+            _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
             _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
         }
 
@@ -265,7 +266,7 @@
             {
                 queueDepth = queueDepth + getMessageSize(message);
             }
-            return (long)Math.round(queueDepth / 1000);
+            return (long) Math.round(queueDepth / 1000);
         }
 
         /**
@@ -314,7 +315,7 @@
         private void notifyClients(String notificationMsg)
         {
             Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
-                                 ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+                                              ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
 
             _broadcaster.sendNotification(n);
         }
@@ -361,7 +362,7 @@
 
             if (msg == null)
             {
-                throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName );
+                throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
             }
             // get message content
             List<ContentBody> cBodies = msg.getContentBodies();
@@ -379,7 +380,7 @@
             }
 
             // Create header attributes list
-            BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+            BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
             String mimeType = headerProperties.getContentType();
             String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
             Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
@@ -402,12 +403,12 @@
             TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
 
             // Create the tabular list of message header contents
-            for (int i = beginIndex; i <= endIndex && i <=  list.size(); i++)
+            for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
             {
                 AMQMessage msg = list.get(i - 1);
-                ContentHeaderBody headerBody =  msg.getContentHeaderBody();
+                ContentHeaderBody headerBody = msg.getContentHeaderBody();
                 // Create header attributes list
-                BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties;
+                BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
                 List<String> headerAttribsList = new ArrayList<String>();
                 headerAttribsList.add("App Id=" + headerProperties.getAppId());
                 headerAttribsList.add("MimeType=" + headerProperties.getContentType());
@@ -430,7 +431,7 @@
         @Override
         public MBeanNotificationInfo[] getNotificationInfo()
         {
-            String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+            String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
             String name = MonitorNotification.class.getName();
             String description = "Either Message count or Queue depth or Message size has reached threshold high value";
             MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
@@ -581,12 +582,12 @@
         _bindings.addBinding(routingKey, exchange);
     }
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters)
             throws AMQException
     {
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
-        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters);
         _subscribers.addSubscriber(subscription);
     }
 

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Wed Dec  6 02:59:27 2006
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.MessageFilter;
 
 public interface Subscription
 {
@@ -29,4 +30,8 @@
     boolean isSuspended();
 
     void queueDeleted(AMQQueue queue);
+
+    boolean hasFilters();
+
+    boolean hasInterest(AMQMessage msg);
 }

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Wed Dec  6 02:59:27 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
 
 /**
  * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,6 +33,9 @@
  */
 public interface SubscriptionFactory
 {
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters)
+        throws AMQException;
+
     Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
         throws AMQException;
 

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Wed Dec  6 02:59:27 2006
@@ -26,7 +26,10 @@
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 /**
@@ -52,19 +55,25 @@
      * True if messages need to be acknowledged
      */
     private final boolean _acks;
+    private FilterManager _filters;
 
     public static class Factory implements SubscriptionFactory
     {
+        public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+        {
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
+        }
+
         public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
         }
 
         public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
         }
     }
 
@@ -72,6 +81,13 @@
                             String consumerTag, boolean acks)
             throws AMQException
     {
+        this(channelId, protocolSession, consumerTag, acks, null);
+    }
+
+    public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+                            String consumerTag, boolean acks, FieldTable filters)
+            throws AMQException
+    {
         AMQChannel channel = protocolSession.getChannel(channelId);
         if (channel == null)
         {
@@ -83,14 +99,9 @@
         this.consumerTag = consumerTag;
         sessionKey = protocolSession.getKey();
         _acks = acks;
+        _filters = FilterManagerFactory.createManager(filters);
     }
 
-    public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
-                            String consumerTag)
-            throws AMQException
-    {
-        this(channel, protocolSession, consumerTag, false);
-    }
 
     public boolean equals(Object o)
     {
@@ -131,7 +142,7 @@
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately. 
-            
+
             // By doing this _before_ the send we ensure that it
             // doesn't get sent if it can't be dequeued, preventing
             // duplicate delivery on recovery.
@@ -176,6 +187,16 @@
     public void queueDeleted(AMQQueue queue)
     {
         channel.queueDeleted(queue);
+    }
+
+    public boolean hasFilters()
+    {
+        return _filters != null;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        return _filters.allAllow(msg);
     }
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Wed Dec  6 02:59:27 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
 import java.util.List;
 import java.util.ListIterator;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@
 
     /**
      * Remove the subscription, returning it if it was found
+     *
      * @param subscription
      * @return null if no match was found
      */
@@ -90,7 +93,7 @@
 
     /**
      * Return the next unsuspended subscription or null if not found.
-     *
+     * <p/>
      * Performance note:
      * This method can scan all items twice when looking for a subscription that is not
      * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,29 +108,59 @@
             return null;
         }
 
-        try {
-            final Subscription result = nextSubscriber();
-            if (result == null) {
+        try
+        {
+            final Subscription result = nextSubscriberImpl(msg);
+            if (result == null)
+            {
                 _currentSubscriber = 0;
-                return nextSubscriber();
-            } else {
+                return nextSubscriberImpl(msg);
+            }
+            else
+            {
                 return result;
             }
-        } catch (IndexOutOfBoundsException e) {
+        }
+        catch (IndexOutOfBoundsException e)
+        {
             _currentSubscriber = 0;
-            return nextSubscriber();
+            return nextSubscriber(msg);
         }
     }
 
-    private Subscription nextSubscriber()
+    private Subscription nextSubscriberImpl(AMQMessage msg)
     {
         final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
-        while (iterator.hasNext()) {
+        while (iterator.hasNext())
+        {
             Subscription subscription = iterator.next();
             ++_currentSubscriber;
             subscriberScanned();
-            if (!subscription.isSuspended()) {
-                return subscription;
+
+            if (!subscription.isSuspended())
+            {
+
+                if ((!subscription.hasFilters()) || (subscription.hasFilters() && subscription.hasInterest(msg)))
+                {
+                    return subscription;
+                }
+                // 2006-12-04 : It is fairer to simply skip the person who isn't interested.
+                // Although it does need to be looked at again.
+
+//                else
+//                {
+//                    //Don't take penalise a subscriber for not wanting this message.
+//                    // This would introduce unfairness sticking with the current subscriber
+//                    // will allow the next message to match.. although could lead to unfairness if:
+//                    // subscribers: a(bin) b(text) c(text)
+//                    // msgs : 1(text) 2(text) 3(bin)
+//                    // subscriber c won't get any messages. as the first two text msgs will go to b and then a will get
+//                    // the bin msg.
+//                    // Never said this was fair round-robin-ing.
+//                    //FIXME - Make a fair round robin.
+//
+//                    --_currentSubscriber;
+//                }
             }
         }
         return null;
@@ -149,7 +182,10 @@
     {
         for (Subscription s : _subscriptions)
         {
-            if (!s.isSuspended()) return true;
+            if (!s.isSuspended())
+            {
+                return true;
+            }
         }
         return false;
     }
@@ -159,7 +195,10 @@
         int count = 0;
         for (Subscription s : _subscriptions)
         {
-            if (!s.isSuspended()) count++;
+            if (!s.isSuspended())
+            {
+                count++;
+            }
         }
         return count;
     }
@@ -167,9 +206,10 @@
     /**
      * Notification that a queue has been deleted. This is called so that the subscription can inform the
      * channel, which in turn can update its list of unacknowledged messages.
+     *
      * @param queue
      */
-    public void queueDeleted(AMQQueue queue)
+    public void queueDeleted(AMQQueue queue) throws AMQException
     {
         for (Subscription s : _subscriptions)
         {
@@ -177,7 +217,8 @@
         }
     }
 
-    int size() {
+    int size()
+    {
         return _subscriptions.size();
     }
 }

Modified: incubator/qpid/branches/jmsselectors/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/pom.xml?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/client/pom.xml Wed Dec  6 02:59:27 2006
@@ -122,7 +122,9 @@
                     </includes>
                     <excludes>
                         <exclude>**/JNDIReferenceableTest.java</exclude>
+			<exclude>**/DurableSubscriptionTest.java</exclude>
                     </excludes>
+	
                 </configuration>
             </plugin>
         </plugins>

Modified: incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Dec  6 02:59:27 2006
@@ -829,14 +829,16 @@
                 AMQDestination amqd = (AMQDestination) destination;
 
                 final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
-                // TODO: construct the rawSelector from the selector string if rawSelector == null
+
                 final FieldTable ft = FieldTableFactory.newFieldTable();
-                //if (rawSelector != null)
-                //    ft.put("headers", rawSelector.getDataAsBytes());
+
+                // Add headers for headers exchange
                 if (rawSelector != null)
                 {
                     ft.putAll(rawSelector);
                 }
+
+
                 BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
                                                                          _messageFactoryRegistry, AMQSession.this,
                                                                          protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
@@ -915,6 +917,18 @@
         protocolHandler.writeFrame(queueBind);
     }
 
+//    /**
+//     * Register to consume from the queue.
+//     *
+//     * @param queueName
+//     * @return the consumer tag generated by the broker
+//     */
+//    private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
+//                                    boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
+//    {
+//        return consumeFromQueue(queueName, protocolHandler, prefetchHigh, prefetchLow, noLocal, exclusive, acknowledgeMode, null);
+//    }
+
     /**
      * Register to consume from the queue.
      *
@@ -922,16 +936,25 @@
      * @return the consumer tag generated by the broker
      */
     private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
-                                    boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
+                                    boolean noLocal, boolean exclusive, int acknowledgeMode, String messageSelector) throws AMQException
     {
         //fixme prefetch values are not used here. Do we need to have them as parametsrs?
         //need to generate a consumer tag on the client so we can exploit the nowait flag
         String tag = Integer.toString(_nextTag++);
 
+        FieldTable ft = new FieldTable();
+
+        if (messageSelector != null)
+        {
+            //fixme move literal value to a common class.
+            ft.put("x-filter-jms-selector", messageSelector);
+        }
+
         AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
                                                               queueName, tag, noLocal,
                                                               acknowledgeMode == Session.NO_ACKNOWLEDGE,
-                                                              exclusive, true);
+                                                              exclusive, true, ft);
+
 
         protocolHandler.writeFrame(jmsConsume);
         return tag;
@@ -1218,11 +1241,22 @@
 
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
-        String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
-                                              consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode());
+        String consumerTag = null;
+        try
+        {
+            consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
+                                           consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode(),
+                                           consumer.getMessageSelector());
+
+            consumer.setConsumerTag(consumerTag);
+            _consumers.put(consumerTag, consumer);
+        }
+        catch (JMSException e)
+        {
+            // getMessageSelector throws JMSEx but it is simply a string return so won't happen.
+        }
+
 
-        consumer.setConsumerTag(consumerTag);
-        _consumers.put(consumerTag, consumer);
     }
 
     /**

Copied: incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (from r483021, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=483025&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java&r1=483021&p2=incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java&r2=483025
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Dec  6 02:59:27 2006
@@ -58,7 +58,8 @@
         {
             _data.acquire();
         }
-        _readableProperties = false;
+        // ContentHeaderProperties are just created and so are empty
+        //_readableProperties = (_contentHeaderProperties != null);
         _readableMessage = (data != null);
     }
 
@@ -424,7 +425,15 @@
             buf.append("\nJMS priority: ").append(getJMSPriority());
             buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
             buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+            buf.append("\nJMS Type: ").append(String.valueOf(getJMSType()));
+            buf.append("\nJMS CorrelationID: ").append(String.valueOf(getJMSCorrelationID()));
+            buf.append("\nJMS Destination: NOT IMPLEMENTED");//.append(String.valueOf(getJMSDestination()));
+            buf.append("\nJMS MessageID: ").append(String.valueOf(getJMSMessageID()));
+            buf.append("\nJMS Redelivered: ").append(String.valueOf(getJMSRedelivered()));
+            buf.append("\nProperty Names: ").append(String.valueOf(getPropertyNames()));
+
             buf.append("\nAMQ message number: ").append(_deliveryTag);
+
             buf.append("\nProperties:");
             if (getJmsContentHeaderProperties().getHeaders().isEmpty())
             {

Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java Wed Dec  6 02:59:27 2006
@@ -24,8 +24,6 @@
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 import org.apache.qpid.test.VMBrokerSetup;
 
 import javax.jms.JMSException;
@@ -104,7 +102,7 @@
 
     public static void main(String[] argv) throws Exception
     {
-        SessionStartTest test = new SessionStartTest();
+        SelectorTest test = new SelectorTest();
         test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0];
         test.setUp();
         test.test();
@@ -112,6 +110,6 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(SessionStartTest.class));
+        return new VMBrokerSetup(new junit.framework.TestSuite(SelectorTest.class));
     }
 }

Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java Wed Dec  6 02:59:27 2006
@@ -21,11 +21,8 @@
 package org.apache.qpid.test.unit.basic;
 
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.test.VMBrokerSetup;
 import org.apache.log4j.Logger;

Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java Wed Dec  6 02:59:27 2006
@@ -50,6 +50,7 @@
     protected void setUp() throws Exception
     {
         super.setUp();
+        TransportConnection.createVMBroker(1);
         _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test");
         _topic = new AMQTopic("mytopic");
         _queue = new AMQQueue("myqueue");

Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java Wed Dec  6 02:59:27 2006
@@ -45,10 +45,10 @@
     {
         _connection = connection;
         _session = connection.createSession(false, ackMode);
-        _factory = new MessageFactory(_session);
+        _factory = new MessageFactory(_session, "topictest.messages.#", 256);
 
         //register for events
-        if(name == null)
+        if (name == null)
         {
             _factory.createTopicConsumer().setMessageListener(this);
         }
@@ -61,9 +61,10 @@
 
         _controller = _factory.createControlPublisher();
         System.out.println("Waiting for messages " +
-                Config.getAckModeDescription(ackMode)
-                + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
-                + "...");
+                           Config.getAckModeDescription(ackMode)
+                           + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
+                           + " with Messages on Topic: " + _factory.getTopic()                           
+                           + "...");
 
     }
 
@@ -75,7 +76,7 @@
             _connection.stop();
             _connection.close();
         }
-        catch(Exception e)
+        catch (Exception e)
         {
             e.printStackTrace(System.out);
         }
@@ -89,7 +90,7 @@
             _controller.send(_factory.createReportResponseMessage(msg));
             System.out.println("Sent report: " + msg);
         }
-        catch(Exception e)
+        catch (Exception e)
         {
             e.printStackTrace(System.out);
         }
@@ -103,18 +104,18 @@
 
     public void onMessage(Message message)
     {
-        if(!init)
+        if (!init)
         {
             start = System.currentTimeMillis();
             count = 0;
             init = true;
         }
 
-        if(_factory.isShutdown(message))
+        if (_factory.isShutdown(message))
         {
             shutdown();
         }
-        else if(_factory.isReport(message))
+        else if (_factory.isReport(message))
         {
             //send a report:
             report();
@@ -132,7 +133,7 @@
         config.setOptions(argv);
 
         Connection con = config.createConnection();
-        if(config.getClientId() != null)
+        if (config.getClientId() != null)
         {
             con.setClientID(config.getClientId());
         }

Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java Wed Dec  6 02:59:27 2006
@@ -23,6 +23,7 @@
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 
+
 import javax.jms.*;
 
 /**
@@ -42,22 +43,28 @@
         this(session, 256);
     }
 
-    MessageFactory(Session session, int size) throws JMSException
+    public MessageFactory(Session session, int size) throws JMSException
+    {
+        this(session, "topictest.messages", size);
+    }
+
+
+    MessageFactory(Session session, String topicMessages, int size) throws JMSException
     {
         _session = session;
-        if(session instanceof AMQSession)
+        if (session instanceof AMQSession)
         {
-            _topic = new AMQTopic("topictest.messages");
+            _topic = new AMQTopic(topicMessages);
             _control = new AMQTopic("topictest.control");
         }
         else
         {
-            _topic = session.createTopic("topictest.messages");
+            _topic = session.createTopic(topicMessages);
             _control = session.createTopic("topictest.control");
         }
         _payload = new byte[size];
 
-        for(int i = 0; i < size; i++)
+        for (int i = 0; i < size; i++)
         {
             _payload[i] = (byte) DATA[i % DATA.length];
         }

Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java Wed Dec  6 02:59:27 2006
@@ -51,7 +51,7 @@
         _factory.createControlConsumer().setMessageListener(this);
         _connection.start();
 
-        if(warmup > 0)
+        if (warmup > 0)
         {
             System.out.println("Runing warmup (" + warmup + " msgs)");
             long time = batch(warmup, consumerCount);
@@ -59,11 +59,14 @@
         }
 
         long[] times = new long[batches];
-        for(int i = 0; i < batches; i++)
+        for (int i = 0; i < batches; i++)
         {
-            if(i > 0) Thread.sleep(delay*1000);
+            if (i > 0)
+            {
+                Thread.sleep(delay * 1000);
+            }
             times[i] = batch(msgCount, consumerCount);
-            System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms.");
+            System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms.");
         }
 
         long min = min(times);
@@ -106,7 +109,7 @@
     private void waitForCompletion(int consumers) throws Exception
     {
         System.out.println("Waiting for completion...");
-        synchronized (_lock)
+        synchronized(_lock)
         {
             while (_count > 0)
             {
@@ -121,7 +124,7 @@
         System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
         if (_count == 0)
         {
-            synchronized (_lock)
+            synchronized(_lock)
             {
                 _lock.notify();
             }
@@ -131,7 +134,7 @@
     static long min(long[] times)
     {
         long min = times.length > 0 ? times[0] : 0;
-        for(int i = 0; i < times.length; i++)
+        for (int i = 0; i < times.length; i++)
         {
             min = Math.min(min, times[i]);
         }
@@ -141,7 +144,7 @@
     static long max(long[] times)
     {
         long max = times.length > 0 ? times[0] : 0;
-        for(int i = 0; i < times.length; i++)
+        for (int i = 0; i < times.length; i++)
         {
             max = Math.max(max, times[i]);
         }
@@ -151,14 +154,25 @@
     static long avg(long[] times, long min, long max)
     {
         long sum = 0;
-        for(int i = 0; i < times.length; i++)
+        for (int i = 0; i < times.length; i++)
         {
             sum += times[i];
         }
-        sum -= min;
-        sum -= max;
 
-        return (sum / (times.length - 2));
+        int divisor = times.length;
+        //remove max and min from averages
+        if (times.length > 2)
+        {
+            sum -= min;
+            sum -= max;
+            divisor -= 2;
+        }
+        else
+        {
+            System.out.println("More batches are required to generate a meaninful average.");
+        }
+
+        return (sum / divisor);
     }
 
     public static void main(String[] argv) throws Exception

Modified: incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Wed Dec  6 02:59:27 2006
@@ -88,9 +88,19 @@
 
     public void queueDeleted(AMQQueue queue)
     {
-        if(queue instanceof ClusteredQueue)
+        if (queue instanceof ClusteredQueue)
         {
             ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
         }
+    }
+
+    public boolean hasFilters()
+    {
+        return false;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        return true;
     }
 }

Modified: incubator/qpid/branches/jmsselectors/java/common/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/common/pom.xml?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/common/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/common/pom.xml Wed Dec  6 02:59:27 2006
@@ -46,7 +46,7 @@
     </properties>
 
     <build>
-        <plugins>          
+        <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-antrun-plugin</artifactId>
@@ -77,6 +77,19 @@
         </plugins>
     </build>
 
+    <repositories>
+     <repository>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+            <id>java.net repository</id>
+            <name>Maven 1.x Repository</name>
+            <url>httsp://maven-repository.dev.java.net/nonav/repository/</url>
+            <layout>legacy</layout>
+        </repository>
+
+
+    </repositories>
     <dependencies>
         <dependency>
             <groupId>log4j</groupId>
@@ -86,7 +99,16 @@
             <groupId>org.apache.mina</groupId>
             <artifactId>mina-core</artifactId>
         </dependency>
+        <!-- dependency>
+            <groupId>grizzly</groupId>
+            <artifactId>grizzly</artifactId>
+            <version>1.0.4</version>
+        </dependency -->
          <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
         </dependency>

Modified: incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java (original)
+++ incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java Wed Dec  6 02:59:27 2006
@@ -64,9 +64,9 @@
                 if (valueIndex + 1 < options.length())
                 {
                     if (options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR ||
-                            options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR ||
-                            options.charAt(valueIndex + 1) == BROKER_SEPARATOR ||
-                            options.charAt(valueIndex + 1) == '\'')
+                        options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR ||
+                        options.charAt(valueIndex + 1) == BROKER_SEPARATOR ||
+                        options.charAt(valueIndex + 1) == '\'')
                     {
                         nestedQuotes--;
 //                        System.out.println(
@@ -119,7 +119,7 @@
             else
             {
                 parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
-                        options.charAt(sepIndex) + "'", options);
+                                     options.charAt(sepIndex) + "'", options);
             }
         }
 

Modified: incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java (original)
+++ incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java Wed Dec  6 02:59:27 2006
@@ -62,12 +62,12 @@
 
         if (getIndex() > -1)
         {
-            if (_length != -1)
+            if (_length > 1)
             {
                 sb.append(" between indicies ");
                 sb.append(getIndex());
                 sb.append(" and ");
-                sb.append(_length);
+                sb.append(getIndex() + _length);
             }
             else
             {

Modified: incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl (original)
+++ incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl Wed Dec  6 02:59:27 2006
@@ -69,7 +69,7 @@
 import org.apache.qpid.framing.EncodingUtils;
 import org.apache.qpid.framing.FieldTable;
 
-/**
+    /**
  * This class is autogenerated, do not modify. [From <xsl:value-of select="$f/parent::frames/@protocol"/>]
  */
 public class <xsl:value-of select="$f/@name"/> extends AMQMethodBody implements EncodableAMQDataBlock

Modified: incubator/qpid/branches/jmsselectors/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/pom.xml?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/pom.xml Wed Dec  6 02:59:27 2006
@@ -433,6 +433,8 @@
           </snapshots>
         </repository>
         -->
+
+
     </repositories>
 
     <pluginRepositories>

Modified: incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java (original)
+++ incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java Wed Dec  6 02:59:27 2006
@@ -90,7 +90,8 @@
 
     /**
      * Delivers messages to a number of queues.
-     * @param count the number of messages to deliver
+     *
+     * @param count  the number of messages to deliver
      * @param queues the list of queues
      * @throws NoConsumersException
      */
@@ -121,7 +122,7 @@
             q.bind("routingKey", exchange);
             try
             {
-                q.registerProtocolSession(createSession(), 1, "1", false);
+                q.registerProtocolSession(createSession(), 1, "1", false, null);
             }
             catch (Exception e)
             {
@@ -135,7 +136,7 @@
     static AMQQueue createQueue(String name) throws AMQException
     {
         return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
-                new OnCurrentThreadExecutor());
+                            new OnCurrentThreadExecutor());
     }
 
     static AMQProtocolSession createSession() throws Exception

Modified: incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java (original)
+++ incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java Wed Dec  6 02:59:27 2006
@@ -70,6 +70,16 @@
     {
     }
 
+    public boolean hasFilters()
+    {
+        return false;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        return true;
+    }
+
     public int hashCode()
     {
         return key.hashCode();

Modified: incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml?view=diff&rev=483025&r1=480507&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml (original)
+++ incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml Wed Dec  6 02:59:27 2006
@@ -2085,6 +2085,13 @@
     method it will raise a channel or connection exception.
     </doc>
   </field>
+
+    <field name="filter" type="table" label="arguments for consuming">
+        <doc>
+            A set of filters for the consume. The syntax and semantics
+            of these filters depends on the providers implementation.
+        </doc>
+    </field>
 </method>
 
 <method name = "consume-ok" synchronous = "1" index = "21">
@@ -2446,9 +2453,9 @@
     A client MUST NOT use this method as a means of selecting messages
     to process.  A rejected message MAY be discarded or dead-lettered,
     not necessarily passed to another client.
-  </doc>      
+  </doc>
   <chassis name = "server" implement = "MUST" />
-    
+
   <field name = "delivery tag" domain = "delivery tag" />
 
   <field name = "requeue" type = "bit">
@@ -2490,7 +2497,7 @@
     The server MUST set the redelivered flag on all messages that are resent.
   </doc>
   <doc name="rule">
-    The server MUST raise a channel exception if this is called on a 
+    The server MUST raise a channel exception if this is called on a
     transacted channel.
   </doc>
 </method>
@@ -2792,7 +2799,7 @@
   <response name = "open-ok" />
   <chassis name = "server" implement = "MUST" />
   <chassis name = "client" implement = "MUST" />
-  
+
   <field name = "identifier" type = "shortstr">
     staging identifier
     <doc>
@@ -2829,7 +2836,7 @@
   <response name = "stage" />
   <chassis name = "server" implement = "MUST" />
   <chassis name = "client" implement = "MUST" />
-  
+
   <field name = "staged size" type = "longlong">
     already staged amount
     <doc>
@@ -3045,7 +3052,7 @@
   </doc>
   <chassis name = "server" implement = "MUST" />
   <field name = "delivery tag" domain = "delivery tag" />
-      
+
   <field name = "multiple" type = "bit">
     acknowledge multiple messages
     <doc>
@@ -3084,7 +3091,7 @@
     not necessarily passed to another client.
   </doc>
   <chassis name = "server" implement = "MUST" />
-    
+
   <field name = "delivery tag" domain = "delivery tag" />
 
   <field name = "requeue" type = "bit">
@@ -3483,7 +3490,7 @@
     <doc>
       Specifies the routing key name specified when the message was
       published.
-    </doc>     
+    </doc>
   </field>
 </method>