You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/16 13:13:28 UTC

svn commit: r496666 [1/3] - in /incubator/qpid/branches/perftesting/qpid: java/broker/ java/broker/src/main/grammar/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/filter/ java/broker/src/main/java/or...

Author: ritchiem
Date: Tue Jan 16 04:13:19 2007
New Revision: 496666

URL: http://svn.apache.org/viewvc?view=rev&rev=496666
Log:
Merged Trunk Changes to version 489140.

The point where we achieved 100% JMS compilance.

Revision: 489140
Author: ritchiem
Date: 17:04:33, 20 December 2006
Message:
QPID-225
Applied Patch for queue browsing with client acknowledgement
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java


Revision: 489113
Author: bhupendrab
Date: 15:25:45, 20 December 2006
Message:
renamed the jar
----
Modified : /incubator/qpid/trunk/qpid/java/distribution/src/main/assembly/management-eclipse-plugin.xml


Revision: 489106
Author: ritchiem
Date: 14:54:01, 20 December 2006
Message:
QPID-101

Initial Implementation of Queue Browsing by Robert Godfrey and Martin Ritchie

AMQChannel.java - record messages browsed so not to discard them on ack.
FilterManagerFactory.java - Added a NoConsumerFilter
ConcurrentSelectorDeliveryManager.java - Update to send browsers messages without taking the message from other consumers
Subscription.java - Added autoClose and isBrowser methods
SubscriptionTestHelper.java / RemoteSubscriptionImpl.java / SubscriptionImpl.java - implemented new interface methods
Added NoConsumerFilter.java


Patches from Rob Godfrey for client implmentation
AMQSession.java - Added AUTO_CLOSE and NO_CONSUME properties to arguments FieldTable for consume method.
BasicMessageConsumer.java - updates to correctly close consumer when an BasicCancel is received from the broker.
AMQProtocolSession.java - method to allow cancellation of the client 
AMQStateManager.java - added handler for BasicCancelOkMethodHandler.java
Added new AMQQueueBrowser.java BasicCancelOkMethodHandler.java
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java


Revision: 489083
Author: ritchiem
Date: 13:26:12, 20 December 2006
Message:
Updated FilterTypes to be more accurate NO_CONSUME and AUTO_CLOSE
----
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java


Revision: 489082
Author: ritchiem
Date: 13:22:27, 20 December 2006
Message:
QPID-233

Applied patch from Rupert Smith
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java


Revision: 489078
Author: ritchiem
Date: 12:57:27, 20 December 2006
Message:
Added new enum for AMQP Filter types
----
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java


Revision: 489070
Author: ritchiem
Date: 12:46:20, 20 December 2006
Message:
QPID-21 outstanding issues:
Fixed an issue where a consumer with no_local set would not have its filters applied to messages.
Fixed problem where new consumers would start with an empty PDQ rather than checking the existing queue of messages for messages of interest.

AMQQueue.java - Added code check exisiting queue data for messages for the new subscriber with a filter.
DeliveryManager.java - added populatePreDeliveryQueue
SynchronizedDeliveryManager.java/ConcurrentDeliveryManager.java - implemented new DeliveryManager.java interface
SubscriptionImpl.java - fixed issue with no_local subscribers had their filters ignored.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java


Revision: 488726
Author: ritchiem
Date: 17:02:19, 19 December 2006
Message:
QPID-222

ensured that the TXBuffer of a message is set to null when re queuing.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java


Revision: 488715
Author: ritchiem
Date: 16:14:28, 19 December 2006
Message:
Maven output clean up.
Mainly removed exception stack traces from expected exceptions.
----
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java


Revision: 488713
Author: ritchiem
Date: 16:09:39, 19 December 2006
Message:
Maven output clean up.
Mainly removed exception stack traces from expected exceptions.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java


Revision: 488712
Author: ritchiem
Date: 16:07:12, 19 December 2006
Message:
QPID-216
BasicConsumeMethodHandler.java - Pulled the nolocal param from the method body and passed down channel to subscription.
SubscriptionFactory.java / AMQQueue.java/AMQChannel.java - passed the nolocal parameter through to the Subscription
ConnectionStartOkMethodHandler.java - Saved the client properties so the client identifier can be used in comparison with the publisher id to implement no_local
AMQMinaProtocolSession.java - added _clientProperties to store the sent client properties.
AMQProtocolSession.java - interface changes to get/set ClientProperties
ConcurrentSelectorDeliveryManager.java - only need to do hasInterset as this will take care of the hasFilters optimisation check.
SubscriptionImpl.java - Added code to do comparison of client ids to determin insterest in a given message.
SubscriptionSet.java - tidied up code to use hasInterest as this is where the nolocal is implemented.

ConnectionStartMethodHandler.java - Moved literal values to a ClientProperties.java enumeration and a QpidProperties.java values.
QpidConnectionMetaData.java - updated to get values from QpidProperties.java

MockProtocolSession.java - null implementation of new get/set methods
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java


Revision: 488711
Author: bhupendrab
Date: 16:00:13, 19 December 2006
Message:
QPID-188 
Unit test for Exchange MBeans
----
Added : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java


Revision: 488705
Author: bhupendrab
Date: 15:17:25, 19 December 2006
Message:
QPID-188 
Adding unit tests for Java broker JMX functionality
----
Added : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java


Revision: 488701
Author: bhupendrab
Date: 15:09:50, 19 December 2006
Message:
QPID-188 
Adding unit tests for Java broker JMX functionality
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java


Revision: 488624
Author: ritchiem
Date: 10:51:39, 19 December 2006
Message:
QPID-21
Added:
SelectorParser.jj - ActiveMQ selector javacc grammar used to generate SelectorParser.java
server/filter - Selector Filtering code from ActiveMQ project adjusted to suite our class and package structure.
server/message - Decorator classes to allow access to the JMSMessage inside the AMQMessage
ConcurrentSelectorDeliveryManager.java - A new DeliveryManager that utilises PreDeliveryQueues to implement selectors
AMQInvalidSelectorException.java - thrown on client and broker when the Selector text is invalid.
Common: log4j.properties to remove error log4j warnings on Common tests.

Modified:
broker/pom.xml - to generate SelectorParser.java
AMQChannel.java - Addition of argument fieldtable for filter setup.
BasicConsumeMethodHandler.java - writing of InvalidSelector channel close exception.
AMQMessage.java - Added decorator to get access to the enclosed JMSMessage
AMQQueue.java - Enhanced 'deliverymanager' property to allow the selection of the ConcurrentSelectorDeliveryManager.
Subscription.java - Enhanced interface to allow a subscription to state an 'interest' in a given message.
SubscriptionFactory.java - Added method to allow passing of filter arguments.
SubscriptionImpl.java - Implemented new Subscription.java methods.
SubscriptionManager.java - Added ability to get a list of current subscribers.
SubscriptionSet.java - augmented nextSubscriber to allow the subscriber to exert the new hasInterest feature.
SynchronizedDeliveryManager.java - fixed Logging class
AMQSession - Added filter extraction from consume call and pass it on to the registration.
ChannelCloseMethodHandler.java - Handle the reception and correct raising of the InvalidSelector Exception
AbstractJMSMessage.java - Expanded imports
BlockingMethodFrameListener.java - added extra info to a debug output line.
SocketTransportConnection.java - made output an info not a warn.
PropertiesFileInitialContextFactory.java - updated to allow the PROVIDER_URL to specify a property file to read in for the initial values.
ClusteredSubscriptionManager.java - Implementation of SubscriptionSet.java
NestedSubscriptionManager.java - Implementation of SubscriptionManager.java
RemoteSubscriptionImpl.java - Implementation Subscription.java
AMQConstant.java - Added '322' "Invalid Selector"
SubscriptionTestHelper.java - Implementation of Subscription.java

Edited specs/amqp-8.0.xml to add field table to consume method.

Thanks to the ActiveMQ project for writing the initial SelectorParser.jj and associated filter Expressions.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/pom.xml
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/grammar(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/grammar, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/grammar/SelectorParser.jj, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified : /incubator/qpid/trunk/qpid/specs/amqp-8.0.xml


Revision: 488596
Author: rgreig
Date: 09:29:19, 19 December 2006
Message:
QPID-215 : Patch supplied by Rob Godfrey - Implement custom JMSX properties
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java


Revision: 488594
Author: bhupendrab
Date: 09:13:29, 19 December 2006
Message:
Name corrected
----
Added : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java(Copy from path: /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQueueMBeanTest.java, Revision, 488281
Deleted : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQueueMBeanTest.java


Revision: 488450
Author: vinoski
Date: 23:09:14, 18 December 2006
Message:
clean up warnings about unused variables

Remove all warnings in common, broker, client, and systests regarding
unused variables, as indicated by Eclipse builds.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanIntrospector.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java


Added:
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/grammar/
      - copied from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/grammar/
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/grammar/SelectorParser.jj
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/
      - copied from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/message/
      - copied from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/
      - copied from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/log4j.properties
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/common/
      - copied from r489140, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
      - copied unchanged from r489140, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Removed:
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQueueMBeanTest.java
Modified:
    incubator/qpid/branches/perftesting/qpid/java/broker/pom.xml
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanIntrospector.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
    incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
    incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
    incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
    incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/pom.xml?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/pom.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/pom.xml Tue Jan 16 04:13:19 2007
@@ -55,6 +55,10 @@
             <artifactId>commons-lang</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.mina</groupId>
             <artifactId>mina-filter-ssl</artifactId>
         </dependency>
@@ -82,6 +86,25 @@
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>javacc-maven-plugin</artifactId>
+                <version>2.0</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+                            <outputDirectory>${basedir}/target/generated</outputDirectory>
+                            <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+                        </configuration>
+                        <goals>
+                            <goal>javacc</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Jan 16 04:13:19 2007
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -45,6 +46,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -109,6 +112,7 @@
     private TxAck ackOp;
 
     private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
+    private Set<Long> _browsedAcks = new HashSet<Long>();
 
     public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
             throws AMQException
@@ -285,12 +289,14 @@
      * @param tag     the tag chosen by the client (if null, server will generate one)
      * @param queue   the queue to subscribe to
      * @param session the protocol session of the subscriber
+     * @param noLocal
      * @return the consumer tag. This is returned to the subscriber and used in
      *         subsequent unsubscribe requests
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
-    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+                                   FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -301,7 +307,7 @@
             throw new ConsumerTagNotUniqueException();
         }
 
-        queue.registerProtocolSession(session, _channelId, tag, acks);
+        queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal);
         _consumerTag2QueueMap.put(tag, queue);
         return tag;
     }
@@ -379,6 +385,8 @@
         {
             if (unacked.queue != null)
             {
+                unacked.message.setTxnBuffer(null);
+
                 unacked.queue.deliver(unacked.message);
             }
         }
@@ -498,7 +506,7 @@
         if (_log.isDebugEnabled())
         {
             _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
-                      " and multiple " + multiple);
+                       " and multiple " + multiple);
         }
         if (multiple)
         {
@@ -550,7 +558,14 @@
 
             for (UnacknowledgedMessage msg : acked)
             {
-                msg.discard();
+                if (!_browsedAcks.contains(deliveryTag))
+                {
+                    msg.discard();
+                }
+                else
+                {
+                    _browsedAcks.remove(deliveryTag);
+                }
             }
 
         }
@@ -567,7 +582,16 @@
                 _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
                 throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
             }
-            msg.discard();
+
+            if (!_browsedAcks.contains(deliveryTag))
+            {
+                msg.discard();
+            }
+            else
+            {
+                _browsedAcks.remove(deliveryTag);
+            }
+
             if (_log.isTraceEnabled())
             {
                 _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
@@ -686,6 +710,12 @@
             session.writeFrame(block);
         }
         _returns.clear();
+    }
+
+    public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+    {
+        _browsedAcks.add(deliveryTag);
+        addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
     }
 
     //we use this wrapper to ensure we are always using the correct

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Tue Jan 16 04:13:19 2007
@@ -522,10 +522,10 @@
          */
         public void unregisterExchange(String exchangeName) throws JMException
         {
-            boolean inUse = false;
             // TODO
             // Check if the exchange is in use.
-            // Check if there are queue-bindings with the exchnage and unregister
+        	// boolean inUse = false;
+            // Check if there are queue-bindings with the exchange and unregister
             // when there are no bindings.
             try
             {

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Jan 16 04:13:19 2007
@@ -21,10 +21,12 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.BasicConsumeOkBody;
 import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.log4j.Logger;
@@ -68,14 +71,15 @@
         {
             AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
 
-            if(queue == null)
+            if (queue == null)
             {
                 _log.info("No queue for '" + body.queue + "'");
             }
             try
             {
-                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue,  session, !body.noAck);
-                if(!body.nowait)
+                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+                                                              body.arguments, body.noLocal);
+                if (!body.nowait)
                 {
                     session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
                 }
@@ -83,10 +87,19 @@
                 //now allow queue to start async processing of any backlog of messages
                 queue.deliverAsync();
             }
-            catch(ConsumerTagNotUniqueException e)
+            catch (AMQInvalidSelectorException ise)
+            {
+                _log.info("Closing connection due to invalid selector");
+                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+                                                                   ise.getMessage(), BasicConsumeBody.CLASS_ID,
+                                                                   BasicConsumeBody.METHOD_ID));
+            }
+            catch (ConsumerTagNotUniqueException e)
             {
                 String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
-                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+                                                                      BasicConsumeBody.CLASS_ID,
+                                                                      BasicConsumeBody.METHOD_ID));
             }
         }
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Tue Jan 16 04:13:19 2007
@@ -78,12 +78,19 @@
 
             AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
 
+            //save clientProperties
+            if (protocolSession.getClientProperties() == null)
+            {
+                protocolSession.setClientProperties(body.clientProperties);
+            }
+
             switch (authResult.status)
             {
                 case ERROR:
                     throw new AMQException("Authentication failed");
                 case SUCCESS:
                     _logger.info("Connected as: " + ss.getAuthorizationID());
+
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
                     AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
                                                                       HeartbeatConfig.getInstance().getDelay());
@@ -122,7 +129,7 @@
     static int getConfiguredFrameSize()
     {
         final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
-        final int framesize =  config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+        final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
         _logger.info("Framesize set to " + framesize);
         return framesize;
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanIntrospector.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanIntrospector.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanIntrospector.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanIntrospector.java Tue Jan 16 04:13:19 2007
@@ -61,7 +61,6 @@
          */
         for (Method method : interfaceClass.getMethods())
         {
-            int    argCount = method.getParameterTypes().length;
             String name = method.getName();
             Class<?>  resultType = method.getReturnType();
             MBeanAttributeInfo attributeInfo = null;

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Jan 16 04:13:19 2007
@@ -26,17 +26,19 @@
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ConnectionStartBody;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ProtocolVersionList;
 import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionStartBody;
 import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
+
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -89,10 +91,11 @@
     private boolean _closed;
     // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
-    
+
     /* AMQP Version for this session */
     private byte _major;
     private byte _minor;
+    private FieldTable _clientProperties;
 
     public ManagedObject getManagedObject()
     {
@@ -128,7 +131,7 @@
         {
             return new AMQProtocolSessionMBean(this);
         }
-        catch(JMException ex)
+        catch (JMException ex)
         {
             _logger.error("AMQProtocolSession MBean creation has failed ", ex);
             throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
@@ -153,18 +156,21 @@
         {
             ProtocolInitiation pi = (ProtocolInitiation) message;
             // this ensures the codec never checks for a PI message again
-            ((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
-            try {
+            ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+            try
+            {
                 pi.checkVersion(this); // Fails if not correct
                 // This sets the protocol version (and hence framing classes) for this session.
                 _major = pi.protocolMajor;
                 _minor = pi.protocolMinor;
                 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
                 String locales = "en_US";
-                AMQFrame response = ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, pi.protocolMinor, null,
+                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
                                                                        mechanisms.getBytes(), locales.getBytes());
                 _minaProtocolSession.write(response);
-            } catch (AMQException e) {
+            }
+            catch (AMQException e)
+            {
                 _logger.error("Received incorrect protocol initiation", e);
                 /* Find last protocol version in protocol version list. Make sure last protocol version
                 listed in the build file (build-module.xml) is the latest version which will be used
@@ -211,7 +217,7 @@
             _logger.debug("Method frame received: " + frame);
         }
         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
-                                                                                    (AMQMethodBody)frame.bodyFrame);
+                                                                                    (AMQMethodBody) frame.bodyFrame);
         try
         {
             boolean wasAnyoneInterested = false;
@@ -266,7 +272,7 @@
         {
             _logger.debug("Content header frame received: " + frame);
         }
-        getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame);
+        getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
     }
 
     private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -275,7 +281,7 @@
         {
             _logger.debug("Content body frame received: " + frame);
         }
-        getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame);
+        getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame);
     }
 
     /**
@@ -355,6 +361,7 @@
      * Close a specific channel. This will remove any resources used by the channel, including:
      * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
      * </ul>
+     *
      * @param channelId id of the channel to close
      * @throws AMQException if an error occurs closing the channel
      * @throws IllegalArgumentException if the channel id is not valid
@@ -381,6 +388,7 @@
 
     /**
      * In our current implementation this is used by the clustering code.
+     *
      * @param channelId
      */
     public void removeChannel(int channelId)
@@ -390,11 +398,12 @@
 
     /**
      * Initialise heartbeats on the session.
+     *
      * @param delay delay in seconds (not ms)
      */
     public void initHeartbeats(int delay)
     {
-        if(delay > 0)
+        if (delay > 0)
         {
             _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
             _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay));
@@ -404,6 +413,7 @@
     /**
      * Closes all channels that were opened by this protocol session. This frees up all resources
      * used by the channel.
+     *
      * @throws AMQException if an error occurs while closing any channel
      */
     private void closeAllChannels() throws AMQException
@@ -412,6 +422,7 @@
         {
             channel.close(this);
         }
+        _channelMap.clear();
     }
 
     /**
@@ -420,7 +431,7 @@
      */
     public void closeSession() throws AMQException
     {
-        if(!_closed)
+        if (!_closed)
         {
             _closed = true;
             closeAllChannels();
@@ -462,11 +473,11 @@
         // information is used by SASL primary.
         if (address instanceof InetSocketAddress)
         {
-            return ((InetSocketAddress)address).getHostName();
+            return ((InetSocketAddress) address).getHostName();
         }
         else if (address instanceof VmPipeAddress)
         {
-            return "vmpipe:" + ((VmPipeAddress)address).getPort();
+            return "vmpipe:" + ((VmPipeAddress) address).getPort();
         }
         else
         {
@@ -483,22 +494,32 @@
     {
         _saslServer = saslServer;
     }
-    
+
+    public FieldTable getClientProperties()
+    {
+        return _clientProperties;
+    }
+
+    public void setClientProperties(FieldTable clientProperties)
+    {
+        _clientProperties = clientProperties;
+    }
+
     /**
      * Convenience methods for managing AMQP version.
      * NOTE: Both major and minor will be set to 0 prior to protocol initiation.
      */
-    
+
     public byte getAmqpMajor()
     {
         return _major;
     }
-    
+
     public byte getAmqpMinor()
     {
         return _minor;
     }
-    
+
     public boolean amqpVersionEquals(byte major, byte minor)
     {
         return _major == major && _minor == minor;

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Jan 16 04:13:19 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
 
@@ -122,4 +123,9 @@
      * @param saslServer
      */
     void setSaslServer(SaslServer saslServer);
+
+
+    FieldTable getClientProperties();
+
+    void setClientProperties(FieldTable clientProperties);
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Jan 16 04:13:19 2007
@@ -25,6 +25,8 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
 import org.apache.qpid.AMQException;
 
 import java.util.ArrayList;
@@ -33,17 +35,21 @@
 import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Combines the information that make up a deliverable message into a more manageable form.
  */
 public class AMQMessage
 {
+    public static final String JMS_MESSAGE = "jms.message";
+
     private final Set<Object> _tokens = new HashSet<Object>();
 
     private AMQProtocolSession _publisher;
 
-    private final BasicPublishBody  _publishBody;
+    private final BasicPublishBody _publishBody;
 
     private ContentHeaderBody _contentHeaderBody;
 
@@ -83,6 +89,8 @@
      * messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
+    private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+    private AtomicBoolean _taken;
 
 
     public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -96,7 +104,9 @@
         _publishBody = publishBody;
         _store = messageStore;
         _contentBodies = new LinkedList<ContentBody>();
+        _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
         _storeWhenComplete = storeWhenComplete;
+        _taken = new AtomicBoolean(false);
     }
 
     public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
@@ -107,6 +117,7 @@
         _publishBody = publishBody;
         _contentHeaderBody = contentHeaderBody;
         _contentBodies = contentBodies;
+        _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
         _messageId = messageId;
         _store = store;
         storeMessage();
@@ -271,7 +282,7 @@
             {
                 _store.removeMessage(_messageId);
             }
-            catch(AMQException e)
+            catch (AMQException e)
             {
                 //to maintain consistency, we revert the count
                 incrementReference();
@@ -292,7 +303,7 @@
 
     public boolean checkToken(Object token)
     {
-        if(_tokens.contains(token))
+        if (_tokens.contains(token))
         {
             return true;
         }
@@ -308,7 +319,7 @@
         //if the message is not persistent or the queue is not durable
         //we will not need to recover the association and so do not
         //need to record it
-        if(isPersistent() && queue.isDurable())
+        if (isPersistent() && queue.isDurable())
         {
             _store.enqueueMessage(queue.getName(), _messageId);
         }
@@ -318,7 +329,7 @@
     {
         //only record associations where both queue and message will survive
         //a restart, so only need to remove association if this is the case
-        if(isPersistent() && queue.isDurable())
+        if (isPersistent() && queue.isDurable())
         {
             _store.dequeueMessage(queue.getName(), _messageId);
         }
@@ -326,14 +337,14 @@
 
     public boolean isPersistent() throws AMQException
     {
-        if(_contentHeaderBody == null)
+        if (_contentHeaderBody == null)
         {
             throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
         }
 
         //todo remove literal values to a constant file such as AMQConstants in common
         return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
-                &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+               && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
     }
 
     public void setTxnBuffer(TxnBuffer buffer)
@@ -352,8 +363,9 @@
      * immediate delivery but has not been marked as delivered to a
      * consumer
      */
-    public void checkDeliveredToConsumer() throws NoConsumersException{
-        if(isImmediate() && !_deliveredToConsumer)
+    public void checkDeliveredToConsumer() throws NoConsumersException
+    {
+        if (isImmediate() && !_deliveredToConsumer)
         {
             throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
         }
@@ -362,8 +374,64 @@
     /**
      * Called when this message is delivered to a consumer. (used to
      * implement the 'immediate' flag functionality).
+     * And by selectors to determin if the message has already been sent
      */
-    public void setDeliveredToConsumer(){
+    public void setDeliveredToConsumer()
+    {
         _deliveredToConsumer = true;
+    }
+
+    /**
+     * Called selectors to determin if the message has already been sent
+     * @return   _deliveredToConsumer
+     */
+    public boolean getDeliveredToConsumer()
+    {
+        return _deliveredToConsumer;
+    }
+
+
+    public MessageDecorator getDecodedMessage(String type)
+    {
+        MessageDecorator msgtype = null;
+
+        if (_decodedMessages != null)
+        {
+            msgtype = _decodedMessages.get(type);
+
+            if (msgtype == null)
+            {
+                msgtype = decorateMessage(type);
+            }
+        }
+
+        return msgtype;
+    }
+
+    private MessageDecorator decorateMessage(String type)
+    {
+        MessageDecorator msgdec = null;
+
+        if (type.equals(JMS_MESSAGE))
+        {
+            msgdec = new JMSMessage(this);
+        }
+
+        if (msgdec != null)
+        {
+            _decodedMessages.put(type, msgdec);
+        }
+
+        return msgdec;
+    }
+
+    public boolean taken()
+    {
+        return _taken.getAndSet(true);
+    }
+
+    public void release()
+    {
+        _taken.set(false);
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Jan 16 04:13:19 2007
@@ -22,6 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -95,7 +96,7 @@
      * max allowed number of messages on a queue.
      */
     private Integer _maxMessageCount = 10000;
-    
+
     /**
      * max queue depth(KB) for the queue
      */
@@ -187,16 +188,29 @@
         _subscribers = subscribers;
         _subscriptionFactory = subscriptionFactory;
 
-        //fixme - Pick one.
-        if (Boolean.getBoolean("concurrentdeliverymanager"))
+        //fixme - Make this configurable via the broker config.xml
+        if (System.getProperties().getProperty("deliverymanager") != null)
         {
-            _logger.info("Using ConcurrentDeliveryManager");
-            _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+            if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+            {
+                _logger.info("Using ConcurrentSelectorDeliveryManager");
+                _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+            }
+            else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+            {
+                _logger.info("Using ConcurrentDeliveryManager");
+                _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+            }
+            else
+            {
+                _logger.info("Using SynchronizedDeliveryManager");
+                _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+            }
         }
         else
         {
-            _logger.info("Using SynchronizedDeliveryManager");
-            _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+            _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
+            _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
         }
     }
 
@@ -348,12 +362,26 @@
         _bindings.addBinding(routingKey, exchange);
     }
 
-    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+    {
+        registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
+    }
+
+    public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
             throws AMQException
     {
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
-        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+
+        if(subscription.hasFilters())
+        {
+            if (_deliveryMgr.hasQueuedMessages())
+            {
+                _deliveryMgr.populatePreDeliveryQueue(subscription);   
+            }
+        }
+
         _subscribers.addSubscriber(subscription);
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java Tue Jan 16 04:13:19 2007
@@ -198,6 +198,11 @@
         return new ArrayList<AMQMessage>(_messages);
     }
 
+    public void populatePreDeliveryQueue(Subscription subscription)
+    {
+        //no-op . This DM has no PreDeliveryQueues
+    }
+
     public synchronized void removeAMessageFromTop() throws AMQException
     {
         AMQMessage msg = poll();
@@ -312,7 +317,6 @@
                 else
                 {
                     s.send(msg, _queue);
-                    msg.setDeliveredToConsumer();
                 }
             }
             finally

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Tue Jan 16 04:13:19 2007
@@ -73,4 +73,6 @@
     void clearAllMessages() throws AMQException;
 
     List<AMQMessage> getMessages();
+
+    void populatePreDeliveryQueue(Subscription subscription);
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Tue Jan 16 04:13:19 2007
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.AMQException;
+
+import java.util.Queue;
+
 public interface Subscription
 {
     void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
@@ -27,4 +31,18 @@
     boolean isSuspended();
 
     void queueDeleted(AMQQueue queue);
+
+    boolean hasFilters();
+
+    boolean hasInterest(AMQMessage msg);
+
+    Queue<AMQMessage> getPreDeliveryQueue();
+
+    void enqueueForPreDelivery(AMQMessage msg);
+
+    boolean isAutoClose();
+
+    void close();
+
+    boolean isBrowser();   
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Tue Jan 16 04:13:19 2007
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
 
 /**
  * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,9 +33,10 @@
  */
 public interface SubscriptionFactory
 {
-    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
-        throws AMQException;
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
+                                    FieldTable filters, boolean noLocal) throws AMQException;
 
-    Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
-        throws AMQException;
+
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+            throws AMQException;
 }