You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC

svn commit: r1225178 [1/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbstore/src/test/ bdbstore/src/test/jav...

Author: rgodfrey
Date: Wed Dec 28 13:02:41 2011
New Revision: 1225178

URL: http://svn.apache.org/viewvc?rev=1225178&view=rev
Log:
QPID-3714 : [Java] Performance Improvements

Persistence:

Store message in same transaction as enqueue if possible

Memory:

Remove unnecessary (un)boxing
Reduce unnecessary copying of message data
Cache short strings
Cache queues for a given routing key on an Exchange
(0-9) Use a fixed size buffer for preparing frames to write out

Other:

Reduce calls to System.currentTimeMillis
(0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point
(0-10) Special case delivery properties and message properties in headers
(0-9) send commit-ok as soon as data committed to store 
Cache publishing access control queries 
(0-9) Optimised long and int typed values for FieldTables
(0-9) Retain FieldTable encoded form
(0-9) Cache queue and topic destinations

Added:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
Modified:
    qpid/trunk/qpid/java/   (props changed)
    qpid/trunk/qpid/java/bdbstore/src/main/   (props changed)
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
    qpid/trunk/qpid/java/bdbstore/src/test/   (props changed)
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
    qpid/trunk/qpid/java/broker-plugins/access-control/src/main/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/access-control/src/test/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/experimental/info/src/main/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/experimental/info/src/test/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/test/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/experimental/shutdown/src/main/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/extras/src/main/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/extras/src/test/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/firewall/src/main/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/firewall/src/test/   (props changed)
    qpid/trunk/qpid/java/broker/src/main/   (props changed)
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java   (contents, props changed)
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java   (contents, props changed)
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java   (contents, props changed)
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
    qpid/trunk/qpid/java/broker/src/test/   (props changed)
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
    qpid/trunk/qpid/java/broker/src/velocity/   (props changed)
    qpid/trunk/qpid/java/client/example/src/main/   (props changed)
    qpid/trunk/qpid/java/client/src/main/   (props changed)
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
    qpid/trunk/qpid/java/client/src/test/   (props changed)
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/   (props changed)
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
    qpid/trunk/qpid/java/client/test/   (props changed)
    qpid/trunk/qpid/java/common/Composite.tpl
    qpid/trunk/qpid/java/common/src/main/   (props changed)
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    qpid/trunk/qpid/java/common/src/test/   (props changed)
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/test/   (props changed)
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java
    qpid/trunk/qpid/java/common/templates/method/version/MethodBodyClass.vm
    qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.vm
    qpid/trunk/qpid/java/common/templates/model/version/MethodRegistryClass.vm
    qpid/trunk/qpid/java/integrationtests/src/main/   (props changed)
    qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/test/   (props changed)
    qpid/trunk/qpid/java/junit-toolkit/src/   (props changed)
    qpid/trunk/qpid/java/junit-toolkit/src/main/   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/   (props changed)
    qpid/trunk/qpid/java/management/common/src/test/   (props changed)
    qpid/trunk/qpid/java/management/eclipse-plugin/src/main/   (props changed)
    qpid/trunk/qpid/java/management/eclipse-plugin/src/test/   (props changed)
    qpid/trunk/qpid/java/management/example/src/main/   (props changed)
    qpid/trunk/qpid/java/perftests/src/main/   (props changed)
    qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/test/   (props changed)
    qpid/trunk/qpid/java/systests/src/main/   (props changed)
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/   (props changed)
    qpid/trunk/qpid/java/testkit/src/main/   (props changed)
    qpid/trunk/qpid/java/tools/src/main/   (props changed)

Propchange: qpid/trunk/qpid/java/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -1,9 +1,8 @@
 build.overrides
-blaze.ipr
-blaze.iws
+*.ipr
+*.iws
 build
-intellijclasses
-qpid.iml
-qpid.ipr
-qpid.iws
+*.iml
+*.iws
 work
+

Propchange: qpid/trunk/qpid/java/bdbstore/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Wed Dec 28 13:02:41 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.ber
 
 import java.io.File;
 import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -32,6 +33,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.*;
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
@@ -43,6 +46,7 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -70,17 +74,6 @@ import org.apache.qpid.server.store.berk
 import com.sleepycat.bind.EntryBinding;
 import com.sleepycat.bind.tuple.ByteBinding;
 import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
 
 /**
  * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -91,7 +84,7 @@ import com.sleepycat.je.TransactionConfi
  * dequeue messages to queues. <tr><td> Generate message identifiers. </table>
  */
 @SuppressWarnings({"unchecked"})
-public class BDBMessageStore implements MessageStore
+public class BDBMessageStore implements MessageStore, DurableConfigurationStore
 {
     private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
 
@@ -205,18 +198,15 @@ public class BDBMessageStore implements 
                                      Configuration storeConfiguration,
                                      LogSubject logSubject) throws Exception
     {
-        _logSubject = logSubject;
-        CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
+        CurrentActor.get().message(logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
 
-        if(_configured)
+        if(!_configured)
         {
-            throw new Exception("ConfigStore already configured");
+            _logSubject = logSubject;
+            configure(name,storeConfiguration);
+            _configured = true;
+            stateTransition(State.CONFIGURING, State.CONFIGURED);
         }
-
-        configure(name,storeConfiguration);
-        
-        _configured = true;
-        stateTransition(State.CONFIGURING, State.CONFIGURED);
         
         recover(recoveryHandler);
         stateTransition(State.RECOVERING, State.STARTED);
@@ -227,24 +217,31 @@ public class BDBMessageStore implements 
                                       Configuration storeConfiguration,
                                       LogSubject logSubject) throws Exception
     {
-        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+        CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
 
         if(!_configured)
         {
-            throw new Exception("ConfigStore not configured");
+            _logSubject = logSubject;
+            configure(name,storeConfiguration);
+            _configured = true;
+            stateTransition(State.CONFIGURING, State.CONFIGURED);
         }
-
+        
         recoverMessages(recoveryHandler);
     }
 
     public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
             Configuration storeConfiguration, LogSubject logSubject) throws Exception
     {
-        CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+        CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+
 
         if(!_configured)
         {
-            throw new Exception("ConfigStore not configured");
+            _logSubject = logSubject;
+            configure(name,storeConfiguration);
+            _configured = true;
+            stateTransition(State.CONFIGURING, State.CONFIGURED);
         }
 
         recoverQueueEntries(recoveryHandler);
@@ -252,7 +249,7 @@ public class BDBMessageStore implements 
         
     }
 
-    public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
+    public org.apache.qpid.server.store.MessageStore.Transaction newTransaction()
     {
         return new BDBTransaction();
     }
@@ -686,8 +683,6 @@ public class BDBMessageStore implements 
         {
             cursor = _messageMetaDataDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
-
             DatabaseEntry value = new DatabaseEntry();
             EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
 
@@ -695,7 +690,7 @@ public class BDBMessageStore implements 
 
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
-                long messageId = (Long) keyBinding.entryToObject(key);
+                long messageId = LongBinding.entryToLong(key);
                 StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
 
                 StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
@@ -781,10 +776,15 @@ public class BDBMessageStore implements 
      *
      * @param messageId Identifies the message to remove.
      *
-     * @throws AMQInternalException If the operation fails for any reason.
+     * @throws AMQStoreException If the operation fails for any reason.
      */
-    public void removeMessage(Long messageId) throws AMQStoreException
+    public void removeMessage(long messageId) throws AMQStoreException
+    {
+        removeMessage(messageId, true);
+    }
+    public void removeMessage(long messageId, boolean sync) throws AMQStoreException
     {
+
         // _log.debug("public void removeMessage(Long messageId = " + messageId): called");
 
         com.sleepycat.je.Transaction tx = null;
@@ -796,8 +796,7 @@ public class BDBMessageStore implements 
             
             //remove the message meta data from the store
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
-            metaKeyBindingTuple.objectToEntry(messageId, key);
+            LongBinding.longToEntry(messageId, key);
 
             if (_log.isDebugEnabled())
             {
@@ -808,9 +807,8 @@ public class BDBMessageStore implements 
             OperationStatus status = _messageMetaDataDb.delete(tx, key);
             if (status == OperationStatus.NOTFOUND)
             {
-                tx.abort();
-
-                throw new AMQStoreException("Message metadata not found for message id " + messageId);
+                _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+                messageId);
             }
 
             if (_log.isDebugEnabled())
@@ -868,7 +866,7 @@ public class BDBMessageStore implements 
             cursor.close();
             cursor = null;
             
-            commit(tx, true);
+            commit(tx, sync);
         }
         catch (DatabaseException e)
         {
@@ -1174,11 +1172,12 @@ public class BDBMessageStore implements 
      *
      * @throws AMQStoreException If the operation fails for any reason.
      */
-    public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+    public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+                               long messageId) throws AMQStoreException
     {
         // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
 
-        AMQShortString name = new AMQShortString(queue.getResourceName());
+        AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
         
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new QueueEntryTB();
@@ -1212,7 +1211,8 @@ public class BDBMessageStore implements 
      *
      * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+    public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+                               long messageId) throws AMQStoreException
     {
         AMQShortString name = new AMQShortString(queue.getResourceName());
 
@@ -1383,7 +1383,7 @@ public class BDBMessageStore implements 
      *
      * @return A fresh message id.
      */
-    public Long getNewMessageId()
+    public long getNewMessageId()
     {
         return _messageId.incrementAndGet();
     }
@@ -1398,7 +1398,7 @@ public class BDBMessageStore implements 
      *
      * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
      */
-    protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset, 
+    protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
                                       ByteBuffer contentBody) throws AMQStoreException
     {
         DatabaseEntry key = new DatabaseEntry();
@@ -1436,7 +1436,8 @@ public class BDBMessageStore implements 
      *
      * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
      */
-    private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
+    private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId, 
+                               StorableMessageMetaData messageMetaData)
             throws AMQStoreException
     {
         if (_log.isDebugEnabled())
@@ -1446,8 +1447,7 @@ public class BDBMessageStore implements 
         }
 
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
-        keyBinding.objectToEntry(messageId, key);
+        LongBinding.longToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
         
         TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
@@ -1475,7 +1475,7 @@ public class BDBMessageStore implements 
      *
      * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException
+    public StorableMessageMetaData getMessageMetaData(long messageId) throws AMQStoreException
     {
         if (_log.isDebugEnabled())
         {
@@ -1484,8 +1484,7 @@ public class BDBMessageStore implements 
         }
 
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
-        keyBinding.objectToEntry(messageId, key);
+        LongBinding.longToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
         TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
 
@@ -1519,7 +1518,7 @@ public class BDBMessageStore implements 
      *
      * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException
+    public int getContent(long messageId, int offset, ByteBuffer dst) throws AMQStoreException
     {    
         DatabaseEntry contentKeyEntry = new DatabaseEntry();
         
@@ -1778,7 +1777,6 @@ public class BDBMessageStore implements 
             {
                 _log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
             }
-
             _complete = true;
 
             notifyAll();
@@ -1799,7 +1797,7 @@ public class BDBMessageStore implements 
         {
             //_log.debug("public void commit(): called");
 
-            _commitThread.addJob(this);
+            _commitThread.addJob(this, _syncCommit);
             
             if(!_syncCommit)
             {
@@ -1807,28 +1805,14 @@ public class BDBMessageStore implements 
                 return;
             }
             
-            synchronized (BDBCommitFuture.this)
-            {
-                while (!_complete)
-                {
-                    try
-                    {
-                        wait(250);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        // _log.error("Unexpected thread interruption: " + e, e);
-                        throw new RuntimeException(e);
-                    }
-                }
+            waitForCompletion();
+            // _log.debug("Commit completed, _databaseException = " + _databaseException);
 
-                // _log.debug("Commit completed, _databaseException = " + _databaseException);
-
-                if (_databaseException != null)
-                {
-                    throw _databaseException;
-                }
+            if (_databaseException != null)
+            {
+                throw _databaseException;
             }
+
         }
 
         public synchronized boolean isComplete()
@@ -1836,10 +1820,11 @@ public class BDBMessageStore implements 
             return _complete;
         }
 
-        public void waitForCompletion()
+        public synchronized void waitForCompletion()
         {
             while (!isComplete())
             {
+                _commitThread.explicitNotify();
                 try
                 {
                     wait(250);
@@ -1866,7 +1851,7 @@ public class BDBMessageStore implements 
         // private final Logger _log = Logger.getLogger(CommitThread.class);
 
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
-        private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>());
+        private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
         private final CheckpointConfig _config = new CheckpointConfig();
         private final Object _lock = new Object();
 
@@ -1877,6 +1862,14 @@ public class BDBMessageStore implements 
 
         }
 
+        public void explicitNotify()
+        {
+            synchronized (_lock)
+            {
+                _lock.notify();
+            }
+        }
+
         public void run()
         {
             while (!_stopped.get())
@@ -1905,24 +1898,25 @@ public class BDBMessageStore implements 
         {
             // _log.debug("private void processJobs(): called");
 
-            // we replace the old queue atomically with a new one and this avoids any need to
-            // copy elements out of the queue
-            Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>());
+            int size = _jobQueue.size();
 
             try
             {
-                // _environment.checkpoint(_config);
+                //TODO - upgrade to BDB 5.0, then use: _environment.flushLog(true);
                 _environment.sync();
 
-                for (BDBCommitFuture commit : jobs)
+                for(int i = 0; i < size; i++)
                 {
+                    BDBCommitFuture commit = _jobQueue.poll();
                     commit.complete();
                 }
+
             }
             catch (DatabaseException e)
             {
-                for (BDBCommitFuture commit : jobs)
+                for(int i = 0; i < size; i++)
                 {
+                    BDBCommitFuture commit = _jobQueue.poll();
                     commit.abort(e);
                 }
             }
@@ -1931,15 +1925,19 @@ public class BDBMessageStore implements 
 
         private boolean hasJobs()
         {
-            return !_jobQueue.get().isEmpty();
+            return !_jobQueue.isEmpty();
         }
 
-        public void addJob(BDBCommitFuture commit)
+        public void addJob(BDBCommitFuture commit, final boolean sync)
         {
-            synchronized (_lock)
+
+            _jobQueue.add(commit);
+            if(sync)
             {
-                _jobQueue.get().add(commit);
-                _lock.notifyAll();
+                synchronized (_lock)
+                {
+                    _lock.notifyAll();
+                }
             }
         }
 
@@ -1959,7 +1957,10 @@ public class BDBMessageStore implements 
 
         private final long _messageId;
         private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-        private com.sleepycat.je.Transaction _txn;
+        
+        private StorableMessageMetaData _metaData;
+        private volatile SoftReference<byte[]> _dataRef;
+        private byte[] _data;
 
         StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
         {
@@ -1973,22 +1974,15 @@ public class BDBMessageStore implements 
             try
             {
                 _messageId = messageId;
+                _metaData = metaData;
 
                 _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-                if(persist)
-                {
-                    _txn = _environment.beginTransaction(null, null);
-                    storeMetaData(_txn, messageId, metaData);
-                }
+
             }
             catch (DatabaseException e)
             {
                 throw new RuntimeException(e);
             }
-            catch (AMQStoreException e)
-            {
-                throw new RuntimeException(e);
-            }
 
         }
 
@@ -2018,58 +2012,114 @@ public class BDBMessageStore implements 
 
         public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
         {
-            try
+            src = src.slice();
+            
+            if(_data == null)
             {
-                BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
+                _data = new byte[src.remaining()];
+                _dataRef = new SoftReference<byte[]>(_data);
+                src.duplicate().get(_data);
             }
-            catch (AMQStoreException e)
+            else
             {
-                throw new RuntimeException(e);
+                byte[] oldData = _data;
+                _data = new byte[oldData.length + src.remaining()];
+                _dataRef = new SoftReference<byte[]>(_data);
+
+                System.arraycopy(oldData,0,_data,0,oldData.length);
+                src.duplicate().get(_data, oldData.length, src.remaining());
             }
+            
         }
 
         public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
         {
-            try
+            byte[] data = _dataRef == null ? null : _dataRef.get();
+            if(data != null)
             {
-                return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+                int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+                dst.put(data, offsetInMessage, length);
+                return length;
             }
-            catch (AMQStoreException e)
+            else
             {
-                throw new RuntimeException(e);
+                try
+                {
+                    return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+                }
+                catch (AMQStoreException e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
         }
 
-        public StoreFuture flushToStore()
+        public ByteBuffer getContent(int offsetInMessage, int size)
         {
-            try
+            byte[] data = _dataRef == null ? null : _dataRef.get();
+            if(data != null)
             {
-                if(_txn != null)
-                {
-                    //if(_log.isDebugEnabled())
-                    //{
-                    //   _log.debug("Flushing message " + _messageId + " to store");
-                    //}
-                    BDBMessageStore.this.commitTranImpl(_txn, true);
-                }
+                return ByteBuffer.wrap(data,offsetInMessage,size);
             }
-            catch (AMQStoreException e)
+            else
             {
-                throw new RuntimeException(e);
+                ByteBuffer buf = ByteBuffer.allocate(size);
+                getContent(offsetInMessage, buf);
+                buf.position(0);
+                return  buf;
             }
-            finally
+        }
+
+        synchronized void store(com.sleepycat.je.Transaction txn)
+        {
+
+            if(_metaData != null)
             {
-                _txn = null;
+                try
+                {
+                    _dataRef = new SoftReference<byte[]>(_data);
+                    BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
+                    BDBMessageStore.this.addContent(txn, _messageId, 0,
+                                                    _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+                }
+                catch(DatabaseException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                catch (AMQStoreException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                catch (RuntimeException e)
+                {
+                    e.printStackTrace();
+                    throw e;
+                }
+                finally
+                {
+                    _metaData = null;
+                    _data = null;
+                }
+            }
+        }
+
+        public synchronized StoreFuture flushToStore()
+        {
+            if(_metaData != null)
+            {
+                com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+                store(txn);
+                BDBMessageStore.this.commit(txn,true);
+
             }
             return IMMEDIATE_FUTURE;
         }
 
         public void remove()
         {
-            flushToStore();
             try
             {
-                BDBMessageStore.this.removeMessage(_messageId);
+                BDBMessageStore.this.removeMessage(_messageId, false);
             }
             catch (AMQStoreException e)
             {
@@ -2094,12 +2144,27 @@ public class BDBMessageStore implements 
             }
         }
 
-        public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+        {
+            if(message.getStoredMessage() instanceof StoredBDBMessage)
+            {
+                ((StoredBDBMessage)message.getStoredMessage()).store(_txn);
+            }
+
+            BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+        }
+
+        public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+        {
+            BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+        }
+
+        public void enqueueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
         {
             BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
         }
 
-        public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+        public void dequeueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
         {
             BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
 

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java Wed Dec 28 13:02:41 2011
@@ -33,7 +33,7 @@ public class QueueEntryTB extends TupleB
     public QueueEntryKey entryToObject(TupleInput tupleInput)
     {
         AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
-        Long messageId = tupleInput.readLong();
+        long messageId = tupleInput.readLong();
 
         return new QueueEntryKey(queueName, messageId);
     }

Propchange: qpid/trunk/qpid/java/bdbstore/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Dec 28 13:02:41 2011
@@ -32,13 +32,11 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.*;
 import org.apache.qpid.server.store.MessageMetaDataType;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
@@ -100,7 +98,7 @@ public class BDBMessageStoreTest extends
          */        
         MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
         DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
-        Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
+        Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
 
         MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, 
                 MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
@@ -162,7 +160,7 @@ public class BDBMessageStoreTest extends
 
         assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
 
-        DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
+        DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties();
         assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
         assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
         assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
@@ -170,7 +168,7 @@ public class BDBMessageStoreTest extends
         assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
         assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
 
-        MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
+        MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties();
         assertNotNull("MessageProperties were not returned", returnedMsgProps);
         assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
         assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
@@ -352,7 +350,7 @@ public class BDBMessageStoreTest extends
      */
     public void testTranCommit() throws Exception
     {
-        TransactionLog log = getVirtualHost().getTransactionLog();
+        MessageStore log = getVirtualHost().getMessageStore();
 
         BDBMessageStore bdbStore = assertBDBStore(log);
 
@@ -366,10 +364,10 @@ public class BDBMessageStoreTest extends
             }
         };
         
-        TransactionLog.Transaction txn = log.newTransaction();
+        MessageStore.Transaction txn = log.newTransaction();
         
-        txn.enqueueMessage(mockQueue, 1L);
-        txn.enqueueMessage(mockQueue, 5L);
+        txn.enqueueMessage(mockQueue, new MockMessage(1L));
+        txn.enqueueMessage(mockQueue, new MockMessage(5L));
         txn.commitTran();
 
         List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -390,7 +388,7 @@ public class BDBMessageStoreTest extends
      */
     public void testTranRollbackBeforeCommit() throws Exception
     {
-        TransactionLog log = getVirtualHost().getTransactionLog();
+        MessageStore log = getVirtualHost().getMessageStore();
 
         BDBMessageStore bdbStore = assertBDBStore(log);
 
@@ -404,14 +402,14 @@ public class BDBMessageStoreTest extends
             }
         };
         
-        TransactionLog.Transaction txn = log.newTransaction();
+        MessageStore.Transaction txn = log.newTransaction();
         
-        txn.enqueueMessage(mockQueue, 21L);
+        txn.enqueueMessage(mockQueue, new MockMessage(21L));
         txn.abortTran();
         
         txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, 22L);
-        txn.enqueueMessage(mockQueue, 23L);
+        txn.enqueueMessage(mockQueue, new MockMessage(22L));
+        txn.enqueueMessage(mockQueue, new MockMessage(23L));
         txn.commitTran();
 
         List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -431,7 +429,7 @@ public class BDBMessageStoreTest extends
      */
     public void testTranRollbackAfterCommit() throws Exception
     {
-        TransactionLog log = getVirtualHost().getTransactionLog();
+        MessageStore log = getVirtualHost().getMessageStore();
 
         BDBMessageStore bdbStore = assertBDBStore(log);
 
@@ -445,17 +443,17 @@ public class BDBMessageStoreTest extends
             }
         };
         
-        TransactionLog.Transaction txn = log.newTransaction();
+        MessageStore.Transaction txn = log.newTransaction();
         
-        txn.enqueueMessage(mockQueue, 30L);
+        txn.enqueueMessage(mockQueue, new MockMessage(30L));
         txn.commitTran();
 
         txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, 31L);
+        txn.enqueueMessage(mockQueue, new MockMessage(31L));
         txn.abortTran();
         
         txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, 32L);
+        txn.enqueueMessage(mockQueue, new MockMessage(32L));
         txn.commitTran();
         
         List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -467,4 +465,73 @@ public class BDBMessageStoreTest extends
         assertEquals("Second Message is incorrect", 32L, val.longValue());
     }
 
+    private static class MockMessage implements ServerMessage, EnqueableMessage
+    {
+        private long _messageId;
+
+        public MockMessage(long messageId)
+        {
+            _messageId = messageId;
+        }
+
+        public String getRoutingKey()
+        {
+            return null;
+        }
+
+        public AMQMessageHeader getMessageHeader()
+        {
+            return null;
+        }
+
+        public StoredMessage getStoredMessage()
+        {
+            return null;
+        }
+
+        public boolean isPersistent()
+        {
+            return true;
+        }
+
+        public long getSize()
+        {
+            return 0;
+        }
+
+        public boolean isImmediate()
+        {
+            return false;
+        }
+
+        public long getExpiration()
+        {
+            return 0;
+        }
+
+        public MessageReference newReference()
+        {
+            return null;
+        }
+
+        public long getMessageNumber()
+        {
+            return _messageId;
+        }
+
+        public long getArrivalTime()
+        {
+            return 0;
+        }
+
+        public int getContent(ByteBuffer buf, int offset)
+        {
+            return 0;
+        }
+
+        public ByteBuffer getContent(int offset, int length)
+        {
+            return null;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java Wed Dec 28 13:02:41 2011
@@ -52,7 +52,9 @@ import org.apache.qpid.framing.abstracti
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
 import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
@@ -415,7 +417,7 @@ public class BDBUpgradeTest extends Qpid
             ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
 
             // add content entry to database
-            long messageId = store.getNewMessageId();
+            final long messageId = store.getNewMessageId();
             TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
             MessageContentKey contentKey = null;
             if (storeVersion == VERSION_4)
@@ -451,9 +453,29 @@ public class BDBUpgradeTest extends Qpid
                     return queueName.asString();
                 }
             };
-            TransactionLog log = (TransactionLog) store;
-            TransactionLog.Transaction txn = log.newTransaction();
-            txn.enqueueMessage(mockQueue, messageId);
+
+            EnqueableMessage mockMessage = new EnqueableMessage()
+            {
+    
+                public long getMessageNumber()
+                {
+                    return messageId;
+                }
+
+                public boolean isPersistent()
+                {
+                    return true;
+                }
+
+                public StoredMessage getStoredMessage()
+                {
+                    return null;
+                }
+            };
+
+            MessageStore log = (MessageStore) store;
+            MessageStore.Transaction txn = log.newTransaction();
+            txn.enqueueMessage(mockQueue, mockMessage);
             txn.commitTran();
         }
         finally

Propchange: qpid/trunk/qpid/java/broker-plugins/access-control/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/access-control/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/experimental/info/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/experimental/info/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/experimental/shutdown/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/extras/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/extras/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/firewall/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker-plugins/firewall/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/trunk/qpid/java/broker/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java Wed Dec 28 13:02:41 2011
@@ -111,6 +111,11 @@ public class ManagementExchange implemen
 
         }
 
+        public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
+        {
+            enqueue(message);
+        }
+
         public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
         {
             enqueue(message);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java Wed Dec 28 13:02:41 2011
@@ -32,6 +32,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.management.common.mbeans.ManagedConnection;
 
 import java.util.ArrayList;
+import java.util.List;
 
 public class QMFBrokerRequestCommand extends QMFCommand
 {
@@ -57,7 +58,7 @@ public class QMFBrokerRequestCommand ext
             QMFMessage responseMessage = new QMFMessage(queueName, cmd);
 
 
-            ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+            List<? extends BaseQueue> queues = exchange.route(responseMessage);
 
 
             for(BaseQueue q : queues)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java Wed Dec 28 13:02:41 2011
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 public class QMFClassQueryCommand extends QMFCommand
 {
@@ -71,7 +72,7 @@ public class QMFClassQueryCommand extend
 
             Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
 
-            ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+            List<? extends BaseQueue> queues = exchange.route(responseMessage);
 
             for(BaseQueue q : queues)
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java Wed Dec 28 13:02:41 2011
@@ -163,7 +163,7 @@ public class QMFGetQueryCommand extends 
 
             Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
 
-            ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+            List<? extends BaseQueue> queues = exchange.route(responseMessage);
 
             for(BaseQueue q : queues)
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java Wed Dec 28 13:02:41 2011
@@ -21,8 +21,11 @@
 
 package org.apache.qpid.qmf;
 
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.message.*;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.transport.codec.BBEncoder;
 
 import java.nio.ByteBuffer;
@@ -59,11 +62,21 @@ public class QMFMessage implements Serve
         return _routingKey;
     }
 
+    public AMQShortString getRoutingKeyShortString()
+    {
+        return AMQShortString.valueOf(_routingKey);
+    }
+
     public AMQMessageHeader getMessageHeader()
     {
         return this;
     }
 
+    public StoredMessage getStoredMessage()
+    {
+        throw new NotImplementedException();
+    }
+
     public boolean isPersistent()
     {
         return false;
@@ -159,9 +172,9 @@ public class QMFMessage implements Serve
         return new QMFMessageReference(this);
     }
 
-    public Long getMessageNumber()
+    public long getMessageNumber()
     {
-        return null;
+        return 0l;
     }
 
     public long getArrivalTime()
@@ -172,9 +185,9 @@ public class QMFMessage implements Serve
     public int getContent(ByteBuffer buf, int offset)
     {
         ByteBuffer src = _content.duplicate();
-        _content.position(offset);
-        _content = _content.slice();
-        int len = _content.remaining();
+        src.position(offset);
+        src = src.slice();
+        int len = src.remaining();
         if(len > buf.remaining())
         {
             len = buf.remaining();
@@ -185,6 +198,16 @@ public class QMFMessage implements Serve
         return len;
     }
 
+
+    public ByteBuffer getContent(int offset, int size)
+    {
+        ByteBuffer src = _content.duplicate();
+        src.position(offset);
+        src = src.slice();
+        src.limit(size);
+        return src;
+    }
+
     private static class QMFMessageReference extends MessageReference<QMFMessage>
     {
         public QMFMessageReference(QMFMessage message)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java Wed Dec 28 13:02:41 2011
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.AMQException;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.ArrayList;
 
@@ -68,7 +69,7 @@ public class QMFMethodRequestCommand ext
             QMFMessage responseMessage = new QMFMessage(queueName, cmd);
 
 
-            ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+            List<? extends BaseQueue> queues = exchange.route(responseMessage);
 
 
             for(BaseQueue q : queues)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java Wed Dec 28 13:02:41 2011
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 public class QMFPackageQueryCommand extends QMFCommand
 {
@@ -67,7 +68,7 @@ public class QMFPackageQueryCommand exte
 
             Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
 
-            ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+            List<? extends BaseQueue> queues = exchange.route(responseMessage);
 
 
             for(BaseQueue q : queues)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java Wed Dec 28 13:02:41 2011
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
 
 import java.util.Collection;
 import java.util.ArrayList;
+import java.util.List;
 
 public class QMFSchemaRequestCommand extends QMFCommand
 {
@@ -70,7 +71,7 @@ public class QMFSchemaRequestCommand ext
             QMFMessage responseMessage = new QMFMessage(routingKey, cmd);
 
 
-            ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+            List<? extends BaseQueue> queues = exchange.route(responseMessage);
 
             for(BaseQueue q : queues)
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Dec 28 13:02:41 2011
@@ -95,7 +95,7 @@ import java.util.concurrent.atomic.Atomi
 
 public class AMQChannel implements SessionConfig, AMQSessionModel
 {
-    public static final int DEFAULT_PREFETCH = 5000;
+    public static final int DEFAULT_PREFETCH = 4096;
 
     private static final Logger _logger = Logger.getLogger(AMQChannel.class);
 
@@ -166,6 +166,8 @@ public class AMQChannel implements Sessi
     private final UUID _id;
     private long _createTime = System.currentTimeMillis();
 
+    private final ClientDeliveryMethod _clientDeliveryMethod;
+
     public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
             throws AMQException
     {
@@ -183,6 +185,8 @@ public class AMQChannel implements Sessi
 
         // by default the session is non-transactional
         _transaction = new AutoCommitTransaction(_messageStore);
+
+         _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
     }
 
     public ConfigStore getConfigStore()
@@ -205,6 +209,11 @@ public class AMQChannel implements Sessi
         return !(_transaction instanceof AutoCommitTransaction);
     }
 
+    public void receivedComplete()
+    {
+    }
+
+
     public boolean inTransaction()
     {
         return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
@@ -284,7 +293,7 @@ public class AMQChannel implements Sessi
             _currentMessage.setExpiration();
 
 
-            MessageMetaData mmd = _currentMessage.headersReceived();
+            MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
             final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd);
             _currentMessage.setStoredMessage(handle);
 
@@ -316,8 +325,7 @@ public class AMQChannel implements Sessi
         {
             try
             {
-                _currentMessage.getStoredMessage().flushToStore();
-                final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
+                final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
 
                 if(!checkMessageUserId(_currentMessage.getContentHeader()))
                 {
@@ -339,11 +347,13 @@ public class AMQChannel implements Sessi
                     }
                     else
                     {
-                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional()));
+                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
                         incrementOutstandingTxnsIfNecessary();
 			            updateTransactionalActivity();
                     }
                 }
+                _currentMessage.getStoredMessage().flushToStore();
+
             }
             finally
             {
@@ -857,10 +867,8 @@ public class AMQChannel implements Sessi
     private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
     {
 
-        Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
-        _unacknowledgedMessageMap.collect(deliveryTag, multiple, ackedMessageMap);
-        _unacknowledgedMessageMap.remove(ackedMessageMap);
-        return ackedMessageMap.values();
+        return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
+
     }
 
     /**
@@ -949,12 +957,17 @@ public class AMQChannel implements Sessi
 
     public void commit() throws AMQException
     {
+        commit(null);
+    }
+    public void commit(Runnable immediateAction) throws AMQException
+    {
+
         if (!isTransactional())
         {
             throw new AMQException("Fatal error: commit called on non-transactional channel");
         }
 
-        _transaction.commit();
+        _transaction.commit(immediateAction);
 
         _txnCommits.incrementAndGet();
         _txnStarts.incrementAndGet();
@@ -1033,7 +1046,7 @@ public class AMQChannel implements Sessi
     {
         if (isTransactional())
         {
-            _txnUpdateTime.set(System.currentTimeMillis());
+            _txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
         }
     }
 
@@ -1079,20 +1092,6 @@ public class AMQChannel implements Sessi
         return _messageStore;
     }
 
-    private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
-        {
-
-            public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
-                    throws AMQException
-            {
-                _session.registerMessageDelivered(entry.getMessage().getSize());
-                getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
-                                                                               deliveryTag, sub.getConsumerTag());
-                entry.incrementDeliveryCount();
-            }
-
-        };
-
     public ClientDeliveryMethod getClientDeliveryMethod()
     {
         return _clientDeliveryMethod;
@@ -1158,11 +1157,10 @@ public class AMQChannel implements Sessi
     private class MessageDeliveryAction implements ServerTransaction.Action
     {
         private IncomingMessage _incommingMessage;
-        private ArrayList<? extends BaseQueue> _destinationQueues;
+        private List<? extends BaseQueue> _destinationQueues;
 
         public MessageDeliveryAction(IncomingMessage currentMessage,
-                                     ArrayList<? extends BaseQueue> destinationQueues,
-                                     boolean transactional)
+                                     List<? extends BaseQueue> destinationQueues)
         {
             _incommingMessage = currentMessage;
             _destinationQueues = destinationQueues;
@@ -1177,8 +1175,10 @@ public class AMQChannel implements Sessi
                 final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
                 MessageReference ref = amqMessage.newReference();
 
-                for(final BaseQueue queue : _destinationQueues)
+                for(int i = 0; i < _destinationQueues.size(); i++)
                 {
+                    BaseQueue queue = _destinationQueues.get(i);
+
                     BaseQueue.PostEnqueueAction action;
 
                     if(immediate)
@@ -1190,7 +1190,7 @@ public class AMQChannel implements Sessi
                         action = null;
                     }
 
-                    queue.enqueue(amqMessage, action);
+                    queue.enqueue(amqMessage, isTransactional(), action);
 
                     if(queue instanceof AMQQueue)
                     {
@@ -1198,6 +1198,8 @@ public class AMQChannel implements Sessi
                     }
 
                 }
+
+                _incommingMessage.getStoredMessage().flushToStore();
                 ref.release();
             }
             catch (AMQException e)
@@ -1539,7 +1541,7 @@ public class AMQChannel implements Sessi
 
             final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
 
-            final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m);
+            final List<? extends BaseQueue> destinationQueues = altExchange.route(m);
 
             if (destinationQueues == null || destinationQueues.isEmpty())
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java Wed Dec 28 13:02:41 2011
@@ -22,8 +22,8 @@ package org.apache.qpid.server;
 
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.AMQException;
@@ -39,13 +39,13 @@ public class ExtractResendAndRequeue imp
     private final Map<Long, QueueEntry> _msgToResend;
     private final boolean _requeueIfUnabletoResend;
     private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
-    private final TransactionLog _transactionLog;
+    private final MessageStore _transactionLog;
 
     public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
                                    Map<Long, QueueEntry> msgToRequeue,
                                    Map<Long, QueueEntry> msgToResend,
                                    boolean requeueIfUnabletoResend,
-                                   TransactionLog txnLog)
+                                   MessageStore txnLog)
     {
         _unacknowledgedMessageMap = unacknowledgedMessageMap;
         _msgToRequeue = msgToRequeue;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Wed Dec 28 13:02:41 2011
@@ -46,10 +46,6 @@ public interface UnacknowledgedMessageMa
 
     void add(long deliveryTag, QueueEntry message);
 
-    void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);
-
-    void remove(Map<Long,QueueEntry> msgs);
-
     QueueEntry remove(long deliveryTag);
 
     Collection<QueueEntry> cancelAllMessages();
@@ -67,6 +63,8 @@ public interface UnacknowledgedMessageMa
      */
     Set<Long> getDeliveryTags();
 
+    Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+
 }
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Wed Dec 28 13:02:41 2011
@@ -157,6 +157,14 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
+    public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+    {
+        Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+        collect(deliveryTag, multiple, ackedMessageMap);
+        remove(ackedMessageMap);
+        return ackedMessageMap.values();
+    }
+
     private void collect(long key, Map<Long, QueueEntry> msgs)
     {
         synchronized (_lock)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Wed Dec 28 13:02:41 2011
@@ -115,4 +115,9 @@ public class Binding
         return result;
     }
 
+    public String toString()
+    {
+        return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+"}";
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Dec 28 13:02:41 2011
@@ -351,11 +351,11 @@ public abstract class AbstractExchange i
 
 
 
-    public final ArrayList<? extends BaseQueue> route(final InboundMessage message)
+    public final List<? extends BaseQueue> route(final InboundMessage message)
     {
         _receivedMessageCount.incrementAndGet();
         _receivedMessageSize.addAndGet(message.getSize());
-        final ArrayList<? extends BaseQueue> queues = doRoute(message);
+        final List<? extends BaseQueue> queues = doRoute(message);
         if(!queues.isEmpty())
         {
             _routedMessageCount.incrementAndGet();
@@ -364,7 +364,7 @@ public abstract class AbstractExchange i
         return queues;
     }
 
-    protected abstract ArrayList<? extends BaseQueue> doRoute(final InboundMessage message);
+    protected abstract List<? extends BaseQueue> doRoute(final InboundMessage message);
 
     public long getMsgReceives()
     {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org