You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/26 22:04:39 UTC

svn commit: r490372 [1/3] - in /incubator/qpid/branches/new_persistence/java: ./ broker/ broker/etc/ broker/src/main/grammar/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/filter/ broker/src/main/java/org/apac...

Author: rgreig
Date: Tue Dec 26 13:04:28 2006
New Revision: 490372

URL: http://svn.apache.org/viewvc?view=rev&rev=490372
Log:
Merge of trunk up to rev 489403

Added:
    incubator/qpid/branches/new_persistence/java/broker/etc/qpid-server.conf.jpp
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf.jpp
    incubator/qpid/branches/new_persistence/java/broker/src/main/grammar/
      - copied from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/grammar/
    incubator/qpid/branches/new_persistence/java/broker/src/main/grammar/SelectorParser.jj
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/
      - copied from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/
      - copied from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/
      - copied from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/log4j.properties
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/common/
      - copied from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
    incubator/qpid/branches/new_persistence/java/distribution/
    incubator/qpid/branches/new_persistence/java/distribution/pom.xml   (with props)
    incubator/qpid/branches/new_persistence/java/distribution/src/
    incubator/qpid/branches/new_persistence/java/distribution/src/main/
    incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/
    incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/bin.xml   (with props)
    incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/client-bin.xml   (with props)
    incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml   (with props)
    incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/management-eclipse-plugin.xml   (with props)
    incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/src.xml   (with props)
    incubator/qpid/branches/new_persistence/java/distribution/src/main/release/
    incubator/qpid/branches/new_persistence/java/distribution/src/main/release/DISCLAIMER
    incubator/qpid/branches/new_persistence/java/distribution/src/main/release/docs/
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
      - copied unchanged from r489403, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
      - copied, changed from r489403, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
Modified:
    incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
    incubator/qpid/branches/new_persistence/java/broker/pom.xml
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
    incubator/qpid/branches/new_persistence/java/client/pom.xml
    incubator/qpid/branches/new_persistence/java/client/src/log4j.properties
    incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
    incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
    incubator/qpid/branches/new_persistence/java/pom.xml
    incubator/qpid/branches/new_persistence/java/systests/pom.xml
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java

Modified: incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/etc/config.xml?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/new_persistence/java/broker/etc/config.xml Tue Dec 26 13:04:28 2006
@@ -82,8 +82,8 @@
         <auto_register>true</auto_register>
     </queue>
     <store>
-        <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
-        <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+        <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+        <!--<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>-->
     </store>
     <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
 </broker>

Modified: incubator/qpid/branches/new_persistence/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/pom.xml?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/pom.xml (original)
+++ incubator/qpid/branches/new_persistence/java/broker/pom.xml Tue Dec 26 13:04:28 2006
@@ -34,7 +34,6 @@
 
     <properties>
         <topDirectoryLocation>..</topDirectoryLocation>
-        <amqj.logging.level>warn</amqj.logging.level>
     </properties>
 
     <dependencies>
@@ -55,6 +54,10 @@
             <artifactId>commons-lang</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.mina</groupId>
             <artifactId>mina-filter-ssl</artifactId>
         </dependency>
@@ -82,6 +85,29 @@
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>javacc-maven-plugin</artifactId>
+                <version>2.0</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+                            <outputDirectory>${basedir}/target/generated-sources</outputDirectory>
+                            <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+                        </configuration>
+                        <goals>
+                            <goal>javacc</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Dec 26 13:04:28 2006
@@ -25,6 +25,8 @@
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -41,6 +43,8 @@
 import org.apache.qpid.server.txn.TxnBuffer;
 
 import java.util.*;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -102,6 +106,8 @@
 
     private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
 
+    private Set<Long> _browsedAcks = new HashSet<Long>();
+
     public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
             throws AMQException
     {
@@ -111,7 +117,7 @@
         _messageStore = messageStore;
         _exchanges = exchanges;
         // by default the session is non-transactional
-        _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages);
+        _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages, _browsedAcks);
     }
 
     /**
@@ -311,13 +317,14 @@
      * @param tag     the tag chosen by the client (if null, server will generate one)
      * @param queue   the queue to subscribe to
      * @param session the protocol session of the subscriber
+     * @param noLocal
      * @return the consumer tag. This is returned to the subscriber and used in
      *         subsequent unsubscribe requests
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
-    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks)
-            throws AMQException, ConsumerTagNotUniqueException
+    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+                                   FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -328,7 +335,7 @@
             throw new ConsumerTagNotUniqueException();
         }
 
-        queue.registerProtocolSession(session, _channelId, tag, acks);
+        queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal);
         _consumerTag2QueueMap.put(tag, queue);
         return tag;
     }
@@ -524,6 +531,12 @@
         return _unacknowledgedMessageMap;
     }
 
+    public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+    {
+        _browsedAcks.add(deliveryTag);
+        addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+    }
+    
     private void checkSuspension()
     {
         boolean suspend;

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
 
 
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
 
 import javax.jms.JMSException;
 
@@ -58,7 +59,8 @@
                 throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue);
             }
 
-            public String getExpressionSymbol() {
+            public String getExpressionSymbol()
+            {
                 return "+";
             }
         };
@@ -193,7 +195,8 @@
         }
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException {
+    public Object evaluate(AMQMessage message) throws AMQException
+    {
         Object lvalue = left.evaluate(message);
         if (lvalue == null) {
             return null;

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java Tue Dec 26 13:04:28 2006
@@ -23,6 +23,7 @@
 
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
 
 import javax.jms.JMSException;
 
@@ -33,13 +34,14 @@
  *
  * @version $Revision$
  */
-public interface BooleanExpression extends Expression {
-    
+public interface BooleanExpression extends Expression
+{
+
     /**
      * @param message
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws JMSException
      */
-    public boolean matches(AMQMessage message) throws JMSException;
+    public boolean matches(AMQMessage message) throws AMQException;
 
 }

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
 
 import java.util.HashSet;
 import java.util.List;
@@ -123,7 +124,7 @@
         /**
          *  org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
          */
-        public Object evaluate(AMQMessage message) throws JMSException {
+        public Object evaluate(AMQMessage message) throws AMQException {
 
             Object rv = this.getRight().evaluate(message);
 
@@ -139,7 +140,7 @@
             return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
         }
         
-        public boolean matches(AMQMessage message) throws JMSException {
+        public boolean matches(AMQMessage message) throws AMQException {
             Object object = evaluate(message);
             return object!=null && object==Boolean.TRUE;            
         }
@@ -199,7 +200,7 @@
 	private static BooleanExpression doCreateEqual(Expression left, Expression right) {
         return new ComparisonExpression(left, right) {
 
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
                 Object lv = left.evaluate(message);
                 Object rv = right.evaluate(message);
                 
@@ -340,7 +341,8 @@
         super(left, right);
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException {
+    public Object evaluate(AMQMessage message) throws AMQException
+    {
         Comparable lv = (Comparable) left.evaluate(message);
         if (lv == null) {
             return null;
@@ -457,7 +459,7 @@
 
     protected abstract boolean asBoolean(int answer);
     
-    public boolean matches(AMQMessage message) throws JMSException {
+    public boolean matches(AMQMessage message) throws AMQException {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            
     }

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
 
 import java.math.BigDecimal;
 
@@ -29,19 +30,24 @@
 
 /**
  * Represents a constant expression
- * 
+ *
  * @version $Revision$
  */
-public class ConstantExpression implements Expression {
+public class ConstantExpression implements Expression
+{
 
-    static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
-        public BooleanConstantExpression(Object value) {
+    static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+    {
+        public BooleanConstantExpression(Object value)
+        {
             super(value);
         }
-        public boolean matches(AMQMessage message) throws JMSException {
+
+        public boolean matches(AMQMessage message) throws AMQException
+        {
             Object object = evaluate(message);
-            return object!=null && object==Boolean.TRUE;            
-        }        
+            return object != null && object == Boolean.TRUE;
+        }
     }
 
     public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
@@ -50,73 +56,92 @@
 
     private Object value;
 
-    public static ConstantExpression createFromDecimal(String text) {
-    	    	
-    	// Strip off the 'l' or 'L' if needed.
-    	if( text.endsWith("l") || text.endsWith("L") )
-    		text = text.substring(0, text.length()-1);
-
-    	Number value;
-    	try {
-    		value = new Long(text);
-    	} catch ( NumberFormatException e) {
-    		// The number may be too big to fit in a long.
-        	value = new BigDecimal(text);    		
-    	}
-    	
+    public static ConstantExpression createFromDecimal(String text)
+    {
+
+        // Strip off the 'l' or 'L' if needed.
+        if (text.endsWith("l") || text.endsWith("L"))
+        {
+            text = text.substring(0, text.length() - 1);
+        }
+
+        Number value;
+        try
+        {
+            value = new Long(text);
+        }
+        catch (NumberFormatException e)
+        {
+            // The number may be too big to fit in a long.
+            value = new BigDecimal(text);
+        }
+
         long l = value.longValue();
-        if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+        if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+        {
             value = new Integer(value.intValue());
         }
         return new ConstantExpression(value);
     }
 
-    public static ConstantExpression createFromHex(String text) {
+    public static ConstantExpression createFromHex(String text)
+    {
         Number value = new Long(Long.parseLong(text.substring(2), 16));
         long l = value.longValue();
-        if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+        if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+        {
             value = new Integer(value.intValue());
         }
         return new ConstantExpression(value);
     }
 
-    public static ConstantExpression createFromOctal(String text) {
+    public static ConstantExpression createFromOctal(String text)
+    {
         Number value = new Long(Long.parseLong(text, 8));
         long l = value.longValue();
-        if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+        if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+        {
             value = new Integer(value.intValue());
         }
         return new ConstantExpression(value);
     }
 
-    public static ConstantExpression createFloat(String text) {
+    public static ConstantExpression createFloat(String text)
+    {
         Number value = new Double(text);
         return new ConstantExpression(value);
     }
 
-    public ConstantExpression(Object value) {
+    public ConstantExpression(Object value)
+    {
         this.value = value;
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException {
+    public Object evaluate(AMQMessage message) throws AMQException
+    {
         return value;
     }
 
-    public Object getValue() {
+    public Object getValue()
+    {
         return value;
-    }    
+    }
 
     /**
      * @see java.lang.Object#toString()
      */
-    public String toString() {
-        if (value == null) {
+    public String toString()
+    {
+        if (value == null)
+        {
             return "NULL";
         }
-        if (value instanceof Boolean) {
+        if (value instanceof Boolean)
+        {
             return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
         }
-        if (value instanceof String) {
+        if (value instanceof String)
+        {
             return encodeString((String) value);
         }
         return value.toString();
@@ -127,7 +152,8 @@
      *
      * @see java.lang.Object#hashCode()
      */
-    public int hashCode() {
+    public int hashCode()
+    {
         return toString().hashCode();
     }
 
@@ -136,9 +162,11 @@
      *
      * @see java.lang.Object#equals(java.lang.Object)
      */
-    public boolean equals(Object o) {
+    public boolean equals(Object o)
+    {
 
-        if (o == null || !this.getClass().equals(o.getClass())) {
+        if (o == null || !this.getClass().equals(o.getClass()))
+        {
             return false;
         }
         return toString().equals(o.toString());
@@ -153,12 +181,15 @@
      * @param s
      * @return
      */
-    public static String encodeString(String s) {
+    public static String encodeString(String s)
+    {
         StringBuffer b = new StringBuffer();
         b.append('\'');
-        for (int i = 0; i < s.length(); i++) {
+        for (int i = 0; i < s.length(); i++)
+        {
             char c = s.charAt(i);
-            if (c == '\'') {
+            if (c == '\'')
+            {
                 b.append(c);
             }
             b.append(c);
@@ -166,5 +197,5 @@
         b.append('\'');
         return b.toString();
     }
-    
+
 }

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java Tue Dec 26 13:04:28 2006
@@ -23,6 +23,7 @@
 
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
 
 import javax.jms.JMSException;
 
@@ -32,11 +33,12 @@
  * 
  * @version $Revision$
  */
-public interface Expression {
+public interface Expression
+{
 
     /**
      * @return the value of this expression
      */
-    public Object evaluate(AMQMessage message) throws JMSException;
+    public Object evaluate(AMQMessage message) throws AMQException;
     
 }

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Tue Dec 26 13:04:28 2006
@@ -64,7 +64,7 @@
             _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
             return match;
         }
-        catch (JMSException e)
+        catch (AMQException e)
         {
             //fixme this needs to be sorted.. it shouldn't happen
             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
 
 import javax.jms.JMSException;
 
@@ -35,7 +36,7 @@
     public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) {
         return new LogicExpression(lvalue, rvalue) {
         	
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
                 
             	Boolean lv = (Boolean) left.evaluate(message);
                 // Can we do an OR shortcut??
@@ -56,7 +57,7 @@
     public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) {
         return new LogicExpression(lvalue, rvalue) {
 
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
 
                 Boolean lv = (Boolean) left.evaluate(message);
 
@@ -85,9 +86,9 @@
         super(left, right);
     }
 
-    abstract public Object evaluate(AMQMessage message) throws JMSException;
+    abstract public Object evaluate(AMQMessage message) throws AMQException;
 
-    public boolean matches(AMQMessage message) throws JMSException {
+    public boolean matches(AMQMessage message) throws AMQException {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            
     }

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java Tue Dec 26 13:04:28 2006
@@ -21,9 +21,7 @@
 package org.apache.qpid.server.filter;
 
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.filter.jms.selector.SelectorParser;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.log4j.Logger;
 
 

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Tue Dec 26 13:04:28 2006
@@ -47,7 +47,7 @@
 
     interface SubExpression
     {
-        public Object evaluate(AMQMessage message);
+        public Object evaluate(AMQMessage message) throws AMQException;
     }
 
     interface JMSExpression
@@ -65,7 +65,7 @@
         }
 
 
-        public Object evaluate(AMQMessage message)
+        public Object evaluate(AMQMessage message) throws AMQException
         {
             JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
             if (msg != null)
@@ -226,7 +226,7 @@
         jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException
+    public Object evaluate(AMQMessage message) throws AMQException
     {
 //        try
 //        {

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
 
 import java.math.BigDecimal;
 import java.util.Collection;
@@ -43,7 +44,8 @@
 
     public static Expression createNegate(Expression left) {
         return new UnaryExpression(left) {
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException
+            {
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) {
                     return null;
@@ -74,7 +76,7 @@
     	final Collection inList = t;
     	
         return new BooleanUnaryExpression(right) {
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
             	
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) {
@@ -126,7 +128,7 @@
             super(left);
         }
 
-        public boolean matches(AMQMessage message) throws JMSException {
+        public boolean matches(AMQMessage message) throws AMQException {
             Object object = evaluate(message);
             return object!=null && object==Boolean.TRUE;            
         }
@@ -135,7 +137,7 @@
         
     public static BooleanExpression createNOT(BooleanExpression left) {
         return new BooleanUnaryExpression(left) {
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
                 Boolean lvalue = (Boolean) right.evaluate(message);
                 if (lvalue == null) {
                     return null;
@@ -159,7 +161,7 @@
 
     public static BooleanExpression createBooleanCast(Expression left) {
         return new BooleanUnaryExpression(left) {
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) 
                     return null;

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Tue Dec 26 13:04:28 2006
@@ -31,6 +31,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
 
 /**
  * Used to evaluate an XPath Expression in a JMS selector.
@@ -75,7 +76,7 @@
     private final XPathEvaluator evaluator;
     
     static public interface XPathEvaluator {
-        public boolean evaluate(AMQMessage message) throws JMSException;
+        public boolean evaluate(AMQMessage message) throws AMQException;
     }    
     
     XPathExpression(String xpath) {
@@ -97,7 +98,7 @@
         }
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException {
+    public Object evaluate(AMQMessage message) throws AMQException {
 //        try {
 //FIXME this is flow to disk work
 //            if( message.isDropped() )
@@ -122,7 +123,8 @@
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws JMSException
      */
-    public boolean matches(AMQMessage message) throws JMSException {
+    public boolean matches(AMQMessage message) throws AMQException
+    {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            
     }

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Tue Dec 26 13:04:28 2006
@@ -18,6 +18,7 @@
 package org.apache.qpid.server.filter;
 
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
 
 import javax.jms.JMSException;
 //
@@ -35,7 +36,7 @@
         this.xpath = xpath;
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException {
+    public Object evaluate(AMQMessage message) throws AMQException {
         return Boolean.FALSE;
     }
 
@@ -48,7 +49,8 @@
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws JMSException
      */
-    public boolean matches(AMQMessage message) throws JMSException {
+    public boolean matches(AMQMessage message) throws AMQException
+    {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            
     }

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Tue Dec 26 13:04:28 2006
@@ -34,6 +34,7 @@
 //import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.xpath.CachedXPathAPI;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
 import org.w3c.dom.Document;
 import org.w3c.dom.traversal.NodeIterator;
 import org.xml.sax.InputSource;
@@ -46,17 +47,26 @@
         this.xpath = xpath;
     }
     
-    public boolean evaluate(AMQMessage m) throws JMSException {
-        if( m instanceof TextMessage ) {
-            String text = ((TextMessage)m).getText();
-            return evaluate(text);                
-        } else if ( m instanceof BytesMessage ) {
-            BytesMessage bm = (BytesMessage) m;
-            byte data[] = new byte[(int) bm.getBodyLength()];
-            bm.readBytes(data);
-            return evaluate(data);
-        }            
-        return false;
+    public boolean evaluate(AMQMessage m) throws AMQException
+    {
+        try
+        {
+
+            if( m instanceof TextMessage ) {
+                String text = ((TextMessage)m).getText();
+                return evaluate(text);
+            } else if ( m instanceof BytesMessage ) {
+                BytesMessage bm = (BytesMessage) m;
+                byte data[] = new byte[(int) bm.getBodyLength()];
+                bm.readBytes(data);
+                return evaluate(data);
+            }
+            return false;
+        }
+        catch (JMSException e)
+        {
+            throw new AMQException("Error evaluting message: " + e, e);
+        }
     }
 
     private boolean evaluate(byte[] data) {

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Dec 26 13:04:28 2006
@@ -21,10 +21,12 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.BasicConsumeOkBody;
 import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.log4j.Logger;
@@ -74,8 +77,9 @@
             }
             try
             {
-                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue,  session, !body.noAck);
-                if(!body.nowait)
+                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+                                                              body.arguments, body.noLocal);
+                if (!body.nowait)
                 {
                     session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
                 }
@@ -83,10 +87,19 @@
                 //now allow queue to start async processing of any backlog of messages
                 queue.deliverAsync();
             }
+            catch (AMQInvalidSelectorException ise)
+            {
+                _log.info("Closing connection due to invalid selector");
+                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+                                                                      ise.getMessage(), BasicConsumeBody.CLASS_ID,
+                                                                      BasicConsumeBody.METHOD_ID));
+            }
             catch (ConsumerTagNotUniqueException e)
             {
                 String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
-                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+                                                                      BasicConsumeBody.CLASS_ID,
+                                                                      BasicConsumeBody.METHOD_ID));
             }
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Tue Dec 26 13:04:28 2006
@@ -78,12 +78,19 @@
 
             AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
 
+            //save clientProperties
+            if (protocolSession.getClientProperties() == null)
+            {
+                protocolSession.setClientProperties(body.clientProperties);
+            }
+
             switch (authResult.status)
             {
                 case ERROR:
                     throw new AMQException("Authentication failed");
                 case SUCCESS:
                     _logger.info("Connected as: " + ss.getAuthorizationID());
+
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
                     AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
                                                                       HeartbeatConfig.getInstance().getDelay());
@@ -122,7 +129,7 @@
     static int getConfiguredFrameSize()
     {
         final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
-        final int framesize =  config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+        final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
         _logger.info("Framesize set to " + framesize);
         return framesize;
     }

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java Tue Dec 26 13:04:28 2006
@@ -24,6 +24,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
 
 import javax.jms.Message;
 import javax.jms.JMSException;
@@ -37,7 +38,7 @@
     private AMQMessage _message;
     private BasicContentHeaderProperties _properties;
 
-    public JMSMessage(AMQMessage message)
+    public JMSMessage(AMQMessage message) throws AMQException
     {
         _message = message;
         ContentHeaderBody contentHeader = message.getContentHeaderBody();

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Dec 26 13:04:28 2006
@@ -26,9 +26,19 @@
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
-import org.apache.qpid.framing.*;
+
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.management.Managable;
@@ -84,6 +94,7 @@
     /* AMQP Version for this session */
     private byte _major;
     private byte _minor;
+    private FieldTable _clientProperties;
 
     public ManagedObject getManagedObject()
     {
@@ -119,7 +130,7 @@
         {
             return new AMQProtocolSessionMBean(this);
         }
-        catch(JMException ex)
+        catch (JMException ex)
         {
             _logger.error("AMQProtocolSession MBean creation has failed ", ex);
             throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
@@ -144,7 +155,7 @@
         {
             ProtocolInitiation pi = (ProtocolInitiation) message;
             // this ensures the codec never checks for a PI message again
-            ((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+            ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
             try
             {
                 pi.checkVersion(this); // Fails if not correct
@@ -153,7 +164,7 @@
                 _minor = pi.protocolMinor;
                 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
                 String locales = "en_US";
-                AMQFrame response = ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, pi.protocolMinor, null,
+                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
                                                                        mechanisms.getBytes(), locales.getBytes());
                 _minaProtocolSession.write(response);
             }
@@ -195,7 +206,7 @@
             _logger.debug("Method frame received: " + frame);
         }
         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
-                                                                                    (AMQMethodBody)frame.bodyFrame);
+                                                                                    (AMQMethodBody) frame.bodyFrame);
         try
         {
             boolean wasAnyoneInterested = false;
@@ -250,7 +261,7 @@
         {
             _logger.debug("Content header frame received: " + frame);
         }
-        getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame);
+        getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
     }
 
     private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -294,8 +305,13 @@
         return _channelMap.get(channelId);
     }
 
-    public void addChannel(AMQChannel channel)
+    public void addChannel(AMQChannel channel) throws AMQException
     {
+        if (_closed)
+        {
+            throw new AMQException("Session is closed");    
+        }
+
         _channelMap.put(channel.getChannelId(), channel);
         checkForNotification();
     }
@@ -339,6 +355,7 @@
      * Close a specific channel. This will remove any resources used by the channel, including:
      * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
      * </ul>
+     *
      * @param channelId id of the channel to close
      * @throws AMQException if an error occurs closing the channel
      * @throws IllegalArgumentException if the channel id is not valid
@@ -365,6 +382,7 @@
 
     /**
      * In our current implementation this is used by the clustering code.
+     *
      * @param channelId
      */
     public void removeChannel(int channelId)
@@ -374,11 +392,12 @@
 
     /**
      * Initialise heartbeats on the session.
+     *
      * @param delay delay in seconds (not ms)
      */
     public void initHeartbeats(int delay)
     {
-        if(delay > 0)
+        if (delay > 0)
         {
             _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
             _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay));
@@ -388,6 +407,7 @@
     /**
      * Closes all channels that were opened by this protocol session. This frees up all resources
      * used by the channel.
+     *
      * @throws AMQException if an error occurs while closing any channel
      */
     private void closeAllChannels() throws AMQException
@@ -396,6 +416,7 @@
         {
             channel.close(this);
         }
+        _channelMap.clear();
     }
 
     /**
@@ -404,7 +425,7 @@
      */
     public void closeSession() throws AMQException
     {
-        if(!_closed)
+        if (!_closed)
         {
             _closed = true;
             closeAllChannels();
@@ -446,11 +467,11 @@
         // information is used by SASL primary.
         if (address instanceof InetSocketAddress)
         {
-            return ((InetSocketAddress)address).getHostName();
+            return ((InetSocketAddress) address).getHostName();
         }
         else if (address instanceof VmPipeAddress)
         {
-            return "vmpipe:" + ((VmPipeAddress)address).getPort();
+            return "vmpipe:" + ((VmPipeAddress) address).getPort();
         }
         else
         {
@@ -466,6 +487,16 @@
     public void setSaslServer(SaslServer saslServer)
     {
         _saslServer = saslServer;
+    }
+
+    public FieldTable getClientProperties()
+    {
+        return _clientProperties;
+    }
+
+    public void setClientProperties(FieldTable clientProperties)
+    {
+        _clientProperties = clientProperties;
     }
 
     /**

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Dec 26 13:04:28 2006
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
 
@@ -69,7 +70,7 @@
      * @param channel the channel to associate with this session. It is an error to
      * associate the same channel with more than one session but this is not validated.
      */
-    void addChannel(AMQChannel channel);
+    void addChannel(AMQChannel channel) throws AMQException;
 
     /**
      * Close a specific channel. This will remove any resources used by the channel, including:
@@ -122,4 +123,9 @@
      * @param saslServer
      */
     void setSaslServer(SaslServer saslServer);
+
+
+    FieldTable getClientProperties();
+
+    void setClientProperties(FieldTable clientProperties);
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Tue Dec 26 13:04:28 2006
@@ -18,6 +18,9 @@
 package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.MBeanConstructor;
@@ -183,33 +186,17 @@
     }
 
     /**
-     * @see AMQMinaProtocolSession#closeChannel(int) 
-     */
-    public void closeChannel(int id) throws JMException
-    {
-        try
-        {
-            AMQChannel channel = _session.getChannel(id);
-            if (channel == null)
-            {
-                throw new JMException("The channel (channel Id = " + id + ") does not exist");
-            }
-
-            _session.closeChannel(id);
-        }
-        catch (AMQException ex)
-        {
-            throw new MBeanException(ex, ex.toString());
-        }
-    }
-
-    /**
      * closes the connection. The administrator can use this management operation to close connection to free up
      * resources.
      * @throws JMException
      */
     public void closeConnection() throws JMException
     {
+        
+        final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(),
+                                                      "Broker Management Console has closing the connection.", 0, 0);
+        _session.writeFrame(response);
+
         try
         {
             _session.closeSession();

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Tue Dec 26 13:04:28 2006
@@ -114,15 +114,6 @@
     void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
 
     /**
-     * Unsubscribes the consumers and unregisters the channel from managed objects.
-     */
-    @MBeanOperation(name="closeChannel",
-                    description="Closes the channel with given channel Id and connected consumers will be unsubscribed",
-                    impact= MBeanOperationInfo.ACTION)
-    void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
-        throws Exception;
-
-    /**
      * Closes all the related channels and unregisters this connection from managed objects.
      */
     @MBeanOperation(name="closeConnection",

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Dec 26 13:04:28 2006
@@ -26,10 +26,14 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
 import org.apache.log4j.Logger;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Combines the information that make up a deliverable message into a more manageable form.
@@ -38,6 +42,8 @@
 {
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
+    public static final String JMS_MESSAGE = "jms.message";
+
     /**
      * Used in clustering
      */
@@ -64,6 +70,9 @@
      */
     private boolean _deliveredToConsumer;
 
+    private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+    private AtomicBoolean _taken;
+    
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
     /**
@@ -141,6 +150,8 @@
         _messageId = messageId;
         _txnContext = txnContext;
         _transientMessageData.setPublishBody(publishBody);
+        _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
+        _taken = new AtomicBoolean(false);
         if (_log.isDebugEnabled())
         {
             _log.debug("Message created with id " + messageId);
@@ -358,6 +369,60 @@
         return _publisher;
     }
 
+    /**
+     * Called selectors to determin if the message has already been sent
+     * @return   _deliveredToConsumer
+     */
+    public boolean getDeliveredToConsumer()
+    {
+        return _deliveredToConsumer;
+    }
+
+
+    public MessageDecorator getDecodedMessage(String type) throws AMQException
+    {
+        MessageDecorator msgtype = null;
+
+        if (_decodedMessages != null)
+        {
+            msgtype = _decodedMessages.get(type);
+
+            if (msgtype == null)
+            {
+                msgtype = decorateMessage(type);
+            }
+        }
+
+        return msgtype;
+    }
+
+    private MessageDecorator decorateMessage(String type) throws AMQException
+    {
+        MessageDecorator msgdec = null;
+
+        if (type.equals(JMS_MESSAGE))
+        {
+            msgdec = new JMSMessage(this);
+        }
+
+        if (msgdec != null)
+        {
+            _decodedMessages.put(type, msgdec);
+        }
+
+        return msgdec;
+    }
+
+    public boolean taken()
+    {
+        return _taken.getAndSet(true);
+    }
+
+    public void release()
+    {
+        _taken.set(false);
+    }
+    
     public boolean checkToken(Object token)
     {
         if (_tokens.contains(token))
@@ -507,8 +572,7 @@
         }
 
         //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+        // Now start writing out the other content bodies        
         //
         while (bodyFrameIterator.hasNext())
         {

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Dec 26 13:04:28 2006
@@ -22,7 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -96,7 +96,7 @@
      * max allowed number of messages on a queue.
      */
     private Integer _maxMessageCount = 10000;
-    
+
     /**
      * max queue depth(KB) for the queue
      */
@@ -188,16 +188,29 @@
         _subscribers = subscribers;
         _subscriptionFactory = subscriptionFactory;
 
-        //fixme - Pick one.
-        if (Boolean.getBoolean("concurrentdeliverymanager"))
+        //fixme - Make this configurable via the broker config.xml
+        if (System.getProperties().getProperty("deliverymanager") != null)
         {
-            _logger.info("Using ConcurrentDeliveryManager");
-            _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+            if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+            {
+                _logger.info("Using ConcurrentSelectorDeliveryManager");
+                _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+            }
+            else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+            {
+                _logger.info("Using ConcurrentDeliveryManager");
+                _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+            }
+            else
+            {
+                _logger.info("Using SynchronizedDeliveryManager");
+                _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+            }
         }
         else
         {
-            _logger.info("Using SynchronizedDeliveryManager");
-            _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+            _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
+            _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
         }
     }
 
@@ -349,12 +362,26 @@
         _bindings.addBinding(routingKey, exchange);
     }
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+    {
+        registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
+    }
+
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
             throws AMQException
     {
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
-        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+
+        if(subscription.hasFilters())
+        {
+            if (_deliveryMgr.hasQueuedMessages())
+            {
+                _deliveryMgr.populatePreDeliveryQueue(subscription);   
+            }
+        }
+
         _subscribers.addSubscriber(subscription);
     }
 

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java Tue Dec 26 13:04:28 2006
@@ -195,6 +195,11 @@
         return new ArrayList<AMQMessage>(_messages);
     }
 
+    public void populatePreDeliveryQueue(Subscription subscription)
+    {
+        //no-op . This DM has no PreDeliveryQueues
+    }
+
     public synchronized void removeAMessageFromTop() throws AMQException
     {
         AMQMessage msg = poll();
@@ -309,7 +314,6 @@
                 else
                 {
                     s.send(msg, _queue);
-                    msg.setDeliveredToConsumer();
                 }
             }
             finally

Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Dec 26 13:04:28 2006
@@ -99,10 +99,10 @@
         // Shrink the ContentBodies to their actual size to save memory.
         if (compressBufferOnQueue)
         {
-            Iterator it = msg.getContentBodies().iterator();
+            Iterator<ContentBody> it = msg.getContentBodyIterator();
             while (it.hasNext())
             {
-                ContentBody cb = (ContentBody) it.next();
+                ContentBody cb = it.next();
                 cb.reduceBufferToFit();
             }
         }
@@ -220,7 +220,7 @@
             //remove sent message from our queue.
             messageQueue.poll();
         }
-        catch (FailedDequeueException e)
+        catch (AMQException e)
         {
             message.release();
             _log.error("Unable to deliver message as dequeue failed: " + e, e);
@@ -276,7 +276,7 @@
         return _messages.poll();
     }
 
-    public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+    public void deliver(String name, AMQMessage msg) throws AMQException
     {
         _log.info(id() + "deliver :" + System.identityHashCode(msg));
 
@@ -289,7 +289,7 @@
             if (s == null) //no-one can take the message right now.
             {
                 _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
-                if (!msg.isImmediate())
+                if (!msg.getPublishBody().immediate)
                 {
                     addMessageToQueue(msg);
 

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Tue Dec 26 13:04:28 2006
@@ -73,4 +73,6 @@
     void clearAllMessages() throws AMQException;
 
     List<AMQMessage> getMessages();
+
+    void populatePreDeliveryQueue(Subscription subscription);
 }