You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC
svn commit: r1225178 [1/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
bdbstore/src/test/ bdbstore/src/test/jav...
Author: rgodfrey
Date: Wed Dec 28 13:02:41 2011
New Revision: 1225178
URL: http://svn.apache.org/viewvc?rev=1225178&view=rev
Log:
QPID-3714 : [Java] Performance Improvements
Persistence:
Store message in same transaction as enqueue if possible
Memory:
Remove unnecessary (un)boxing
Reduce unnecessary copying of message data
Cache short strings
Cache queues for a given routing key on an Exchange
(0-9) Use a fixed size buffer for preparing frames to write out
Other:
Reduce calls to System.currentTimeMillis
(0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point
(0-10) Special case delivery properties and message properties in headers
(0-9) send commit-ok as soon as data committed to store
Cache publishing access control queries
(0-9) Optimised long and int typed values for FieldTables
(0-9) Retain FieldTable encoded form
(0-9) Cache queue and topic destinations
Added:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java
Modified:
qpid/trunk/qpid/java/ (props changed)
qpid/trunk/qpid/java/bdbstore/src/main/ (props changed)
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
qpid/trunk/qpid/java/bdbstore/src/test/ (props changed)
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
qpid/trunk/qpid/java/broker-plugins/access-control/src/main/ (props changed)
qpid/trunk/qpid/java/broker-plugins/access-control/src/test/ (props changed)
qpid/trunk/qpid/java/broker-plugins/experimental/info/src/main/ (props changed)
qpid/trunk/qpid/java/broker-plugins/experimental/info/src/test/ (props changed)
qpid/trunk/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/test/ (props changed)
qpid/trunk/qpid/java/broker-plugins/experimental/shutdown/src/main/ (props changed)
qpid/trunk/qpid/java/broker-plugins/extras/src/main/ (props changed)
qpid/trunk/qpid/java/broker-plugins/extras/src/test/ (props changed)
qpid/trunk/qpid/java/broker-plugins/firewall/src/main/ (props changed)
qpid/trunk/qpid/java/broker-plugins/firewall/src/test/ (props changed)
qpid/trunk/qpid/java/broker/src/main/ (props changed)
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (contents, props changed)
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (contents, props changed)
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java (contents, props changed)
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/trunk/qpid/java/broker/src/test/ (props changed)
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
qpid/trunk/qpid/java/broker/src/velocity/ (props changed)
qpid/trunk/qpid/java/client/example/src/main/ (props changed)
qpid/trunk/qpid/java/client/src/main/ (props changed)
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
qpid/trunk/qpid/java/client/src/test/ (props changed)
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/ (props changed)
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
qpid/trunk/qpid/java/client/test/ (props changed)
qpid/trunk/qpid/java/common/Composite.tpl
qpid/trunk/qpid/java/common/src/main/ (props changed)
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
qpid/trunk/qpid/java/common/src/test/ (props changed)
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/test/ (props changed)
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java
qpid/trunk/qpid/java/common/templates/method/version/MethodBodyClass.vm
qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.vm
qpid/trunk/qpid/java/common/templates/model/version/MethodRegistryClass.vm
qpid/trunk/qpid/java/integrationtests/src/main/ (props changed)
qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/test/ (props changed)
qpid/trunk/qpid/java/junit-toolkit/src/ (props changed)
qpid/trunk/qpid/java/junit-toolkit/src/main/ (props changed)
qpid/trunk/qpid/java/management/common/src/main/ (props changed)
qpid/trunk/qpid/java/management/common/src/test/ (props changed)
qpid/trunk/qpid/java/management/eclipse-plugin/src/main/ (props changed)
qpid/trunk/qpid/java/management/eclipse-plugin/src/test/ (props changed)
qpid/trunk/qpid/java/management/example/src/main/ (props changed)
qpid/trunk/qpid/java/perftests/src/main/ (props changed)
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/test/ (props changed)
qpid/trunk/qpid/java/systests/src/main/ (props changed)
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/ (props changed)
qpid/trunk/qpid/java/testkit/src/main/ (props changed)
qpid/trunk/qpid/java/tools/src/main/ (props changed)
Propchange: qpid/trunk/qpid/java/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -1,9 +1,8 @@
build.overrides
-blaze.ipr
-blaze.iws
+*.ipr
+*.iws
build
-intellijclasses
-qpid.iml
-qpid.ipr
-qpid.iws
+*.iml
+*.iws
work
+
Propchange: qpid/trunk/qpid/java/bdbstore/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Wed Dec 28 13:02:41 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.ber
import java.io.File;
import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -32,6 +33,8 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.*;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
@@ -43,6 +46,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -70,17 +74,6 @@ import org.apache.qpid.server.store.berk
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -91,7 +84,7 @@ import com.sleepycat.je.TransactionConfi
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
@SuppressWarnings({"unchecked"})
-public class BDBMessageStore implements MessageStore
+public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
@@ -205,18 +198,15 @@ public class BDBMessageStore implements
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- _logSubject = logSubject;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
- if(_configured)
+ if(!_configured)
{
- throw new Exception("ConfigStore already configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
-
- configure(name,storeConfiguration);
-
- _configured = true;
- stateTransition(State.CONFIGURING, State.CONFIGURED);
recover(recoveryHandler);
stateTransition(State.RECOVERING, State.STARTED);
@@ -227,24 +217,31 @@ public class BDBMessageStore implements
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
- throw new Exception("ConfigStore not configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
-
+
recoverMessages(recoveryHandler);
}
public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+
if(!_configured)
{
- throw new Exception("ConfigStore not configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
recoverQueueEntries(recoveryHandler);
@@ -252,7 +249,7 @@ public class BDBMessageStore implements
}
- public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
+ public org.apache.qpid.server.store.MessageStore.Transaction newTransaction()
{
return new BDBTransaction();
}
@@ -686,8 +683,6 @@ public class BDBMessageStore implements
{
cursor = _messageMetaDataDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
-
DatabaseEntry value = new DatabaseEntry();
EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
@@ -695,7 +690,7 @@ public class BDBMessageStore implements
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- long messageId = (Long) keyBinding.entryToObject(key);
+ long messageId = LongBinding.entryToLong(key);
StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
@@ -781,10 +776,15 @@ public class BDBMessageStore implements
*
* @param messageId Identifies the message to remove.
*
- * @throws AMQInternalException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- public void removeMessage(Long messageId) throws AMQStoreException
+ public void removeMessage(long messageId) throws AMQStoreException
+ {
+ removeMessage(messageId, true);
+ }
+ public void removeMessage(long messageId, boolean sync) throws AMQStoreException
{
+
// _log.debug("public void removeMessage(Long messageId = " + messageId): called");
com.sleepycat.je.Transaction tx = null;
@@ -796,8 +796,7 @@ public class BDBMessageStore implements
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
- EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
- metaKeyBindingTuple.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
if (_log.isDebugEnabled())
{
@@ -808,9 +807,8 @@ public class BDBMessageStore implements
OperationStatus status = _messageMetaDataDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- tx.abort();
-
- throw new AMQStoreException("Message metadata not found for message id " + messageId);
+ _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
}
if (_log.isDebugEnabled())
@@ -868,7 +866,7 @@ public class BDBMessageStore implements
cursor.close();
cursor = null;
- commit(tx, true);
+ commit(tx, sync);
}
catch (DatabaseException e)
{
@@ -1174,11 +1172,12 @@ public class BDBMessageStore implements
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws AMQStoreException
{
// _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
- AMQShortString name = new AMQShortString(queue.getResourceName());
+ AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
@@ -1212,7 +1211,8 @@ public class BDBMessageStore implements
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws AMQStoreException
{
AMQShortString name = new AMQShortString(queue.getResourceName());
@@ -1383,7 +1383,7 @@ public class BDBMessageStore implements
*
* @return A fresh message id.
*/
- public Long getNewMessageId()
+ public long getNewMessageId()
{
return _messageId.incrementAndGet();
}
@@ -1398,7 +1398,7 @@ public class BDBMessageStore implements
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset,
+ protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
ByteBuffer contentBody) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
@@ -1436,7 +1436,8 @@ public class BDBMessageStore implements
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
+ StorableMessageMetaData messageMetaData)
throws AMQStoreException
{
if (_log.isDebugEnabled())
@@ -1446,8 +1447,7 @@ public class BDBMessageStore implements
}
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
@@ -1475,7 +1475,7 @@ public class BDBMessageStore implements
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException
+ public StorableMessageMetaData getMessageMetaData(long messageId) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
@@ -1484,8 +1484,7 @@ public class BDBMessageStore implements
}
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
@@ -1519,7 +1518,7 @@ public class BDBMessageStore implements
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException
+ public int getContent(long messageId, int offset, ByteBuffer dst) throws AMQStoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
@@ -1778,7 +1777,6 @@ public class BDBMessageStore implements
{
_log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
}
-
_complete = true;
notifyAll();
@@ -1799,7 +1797,7 @@ public class BDBMessageStore implements
{
//_log.debug("public void commit(): called");
- _commitThread.addJob(this);
+ _commitThread.addJob(this, _syncCommit);
if(!_syncCommit)
{
@@ -1807,28 +1805,14 @@ public class BDBMessageStore implements
return;
}
- synchronized (BDBCommitFuture.this)
- {
- while (!_complete)
- {
- try
- {
- wait(250);
- }
- catch (InterruptedException e)
- {
- // _log.error("Unexpected thread interruption: " + e, e);
- throw new RuntimeException(e);
- }
- }
+ waitForCompletion();
+ // _log.debug("Commit completed, _databaseException = " + _databaseException);
- // _log.debug("Commit completed, _databaseException = " + _databaseException);
-
- if (_databaseException != null)
- {
- throw _databaseException;
- }
+ if (_databaseException != null)
+ {
+ throw _databaseException;
}
+
}
public synchronized boolean isComplete()
@@ -1836,10 +1820,11 @@ public class BDBMessageStore implements
return _complete;
}
- public void waitForCompletion()
+ public synchronized void waitForCompletion()
{
while (!isComplete())
{
+ _commitThread.explicitNotify();
try
{
wait(250);
@@ -1866,7 +1851,7 @@ public class BDBMessageStore implements
// private final Logger _log = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>());
+ private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
@@ -1877,6 +1862,14 @@ public class BDBMessageStore implements
}
+ public void explicitNotify()
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+
public void run()
{
while (!_stopped.get())
@@ -1905,24 +1898,25 @@ public class BDBMessageStore implements
{
// _log.debug("private void processJobs(): called");
- // we replace the old queue atomically with a new one and this avoids any need to
- // copy elements out of the queue
- Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>());
+ int size = _jobQueue.size();
try
{
- // _environment.checkpoint(_config);
+ //TODO - upgrade to BDB 5.0, then use: _environment.flushLog(true);
_environment.sync();
- for (BDBCommitFuture commit : jobs)
+ for(int i = 0; i < size; i++)
{
+ BDBCommitFuture commit = _jobQueue.poll();
commit.complete();
}
+
}
catch (DatabaseException e)
{
- for (BDBCommitFuture commit : jobs)
+ for(int i = 0; i < size; i++)
{
+ BDBCommitFuture commit = _jobQueue.poll();
commit.abort(e);
}
}
@@ -1931,15 +1925,19 @@ public class BDBMessageStore implements
private boolean hasJobs()
{
- return !_jobQueue.get().isEmpty();
+ return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit)
+ public void addJob(BDBCommitFuture commit, final boolean sync)
{
- synchronized (_lock)
+
+ _jobQueue.add(commit);
+ if(sync)
{
- _jobQueue.get().add(commit);
- _lock.notifyAll();
+ synchronized (_lock)
+ {
+ _lock.notifyAll();
+ }
}
}
@@ -1959,7 +1957,10 @@ public class BDBMessageStore implements
private final long _messageId;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private com.sleepycat.je.Transaction _txn;
+
+ private StorableMessageMetaData _metaData;
+ private volatile SoftReference<byte[]> _dataRef;
+ private byte[] _data;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
{
@@ -1973,22 +1974,15 @@ public class BDBMessageStore implements
try
{
_messageId = messageId;
+ _metaData = metaData;
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _txn = _environment.beginTransaction(null, null);
- storeMetaData(_txn, messageId, metaData);
- }
+
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
}
@@ -2018,58 +2012,114 @@ public class BDBMessageStore implements
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
- try
+ src = src.slice();
+
+ if(_data == null)
{
- BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
}
+
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
- try
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
{
- return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ try
+ {
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
- public StoreFuture flushToStore()
+ public ByteBuffer getContent(int offsetInMessage, int size)
{
- try
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
{
- if(_txn != null)
- {
- //if(_log.isDebugEnabled())
- //{
- // _log.debug("Flushing message " + _messageId + " to store");
- //}
- BDBMessageStore.this.commitTranImpl(_txn, true);
- }
+ return ByteBuffer.wrap(data,offsetInMessage,size);
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(offsetInMessage, buf);
+ buf.position(0);
+ return buf;
}
- finally
+ }
+
+ synchronized void store(com.sleepycat.je.Transaction txn)
+ {
+
+ if(_metaData != null)
{
- _txn = null;
+ try
+ {
+ _dataRef = new SoftReference<byte[]>(_data);
+ BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
+ BDBMessageStore.this.addContent(txn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ catch(DatabaseException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+ }
+
+ public synchronized StoreFuture flushToStore()
+ {
+ if(_metaData != null)
+ {
+ com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+ store(txn);
+ BDBMessageStore.this.commit(txn,true);
+
}
return IMMEDIATE_FUTURE;
}
public void remove()
{
- flushToStore();
try
{
- BDBMessageStore.this.removeMessage(_messageId);
+ BDBMessageStore.this.removeMessage(_messageId, false);
}
catch (AMQStoreException e)
{
@@ -2094,12 +2144,27 @@ public class BDBMessageStore implements
}
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ if(message.getStoredMessage() instanceof StoredBDBMessage)
+ {
+ ((StoredBDBMessage)message.getStoredMessage()).store(_txn);
+ }
+
+ BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
{
BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
{
BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java Wed Dec 28 13:02:41 2011
@@ -33,7 +33,7 @@ public class QueueEntryTB extends TupleB
public QueueEntryKey entryToObject(TupleInput tupleInput)
{
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- Long messageId = tupleInput.readLong();
+ long messageId = tupleInput.readLong();
return new QueueEntryKey(queueName, messageId);
}
Propchange: qpid/trunk/qpid/java/bdbstore/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Dec 28 13:02:41 2011
@@ -32,13 +32,11 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.*;
import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -100,7 +98,7 @@ public class BDBMessageStoreTest extends
*/
MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
- Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
+ Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
@@ -162,7 +160,7 @@ public class BDBMessageStoreTest extends
assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
- DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
+ DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties();
assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
@@ -170,7 +168,7 @@ public class BDBMessageStoreTest extends
assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
- MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
+ MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties();
assertNotNull("MessageProperties were not returned", returnedMsgProps);
assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
@@ -352,7 +350,7 @@ public class BDBMessageStoreTest extends
*/
public void testTranCommit() throws Exception
{
- TransactionLog log = getVirtualHost().getTransactionLog();
+ MessageStore log = getVirtualHost().getMessageStore();
BDBMessageStore bdbStore = assertBDBStore(log);
@@ -366,10 +364,10 @@ public class BDBMessageStoreTest extends
}
};
- TransactionLog.Transaction txn = log.newTransaction();
+ MessageStore.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 1L);
- txn.enqueueMessage(mockQueue, 5L);
+ txn.enqueueMessage(mockQueue, new MockMessage(1L));
+ txn.enqueueMessage(mockQueue, new MockMessage(5L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -390,7 +388,7 @@ public class BDBMessageStoreTest extends
*/
public void testTranRollbackBeforeCommit() throws Exception
{
- TransactionLog log = getVirtualHost().getTransactionLog();
+ MessageStore log = getVirtualHost().getMessageStore();
BDBMessageStore bdbStore = assertBDBStore(log);
@@ -404,14 +402,14 @@ public class BDBMessageStoreTest extends
}
};
- TransactionLog.Transaction txn = log.newTransaction();
+ MessageStore.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 21L);
+ txn.enqueueMessage(mockQueue, new MockMessage(21L));
txn.abortTran();
txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 22L);
- txn.enqueueMessage(mockQueue, 23L);
+ txn.enqueueMessage(mockQueue, new MockMessage(22L));
+ txn.enqueueMessage(mockQueue, new MockMessage(23L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -431,7 +429,7 @@ public class BDBMessageStoreTest extends
*/
public void testTranRollbackAfterCommit() throws Exception
{
- TransactionLog log = getVirtualHost().getTransactionLog();
+ MessageStore log = getVirtualHost().getMessageStore();
BDBMessageStore bdbStore = assertBDBStore(log);
@@ -445,17 +443,17 @@ public class BDBMessageStoreTest extends
}
};
- TransactionLog.Transaction txn = log.newTransaction();
+ MessageStore.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 30L);
+ txn.enqueueMessage(mockQueue, new MockMessage(30L));
txn.commitTran();
txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 31L);
+ txn.enqueueMessage(mockQueue, new MockMessage(31L));
txn.abortTran();
txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 32L);
+ txn.enqueueMessage(mockQueue, new MockMessage(32L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
@@ -467,4 +465,73 @@ public class BDBMessageStoreTest extends
assertEquals("Second Message is incorrect", 32L, val.longValue());
}
+ private static class MockMessage implements ServerMessage, EnqueableMessage
+ {
+ private long _messageId;
+
+ public MockMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public String getRoutingKey()
+ {
+ return null;
+ }
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public long getSize()
+ {
+ return 0;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ public MessageReference newReference()
+ {
+ return null;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public long getArrivalTime()
+ {
+ return 0;
+ }
+
+ public int getContent(ByteBuffer buf, int offset)
+ {
+ return 0;
+ }
+
+ public ByteBuffer getContent(int offset, int length)
+ {
+ return null;
+ }
+ }
}
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java Wed Dec 28 13:02:41 2011
@@ -52,7 +52,9 @@ import org.apache.qpid.framing.abstracti
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
@@ -415,7 +417,7 @@ public class BDBUpgradeTest extends Qpid
ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
// add content entry to database
- long messageId = store.getNewMessageId();
+ final long messageId = store.getNewMessageId();
TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
MessageContentKey contentKey = null;
if (storeVersion == VERSION_4)
@@ -451,9 +453,29 @@ public class BDBUpgradeTest extends Qpid
return queueName.asString();
}
};
- TransactionLog log = (TransactionLog) store;
- TransactionLog.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, messageId);
+
+ EnqueableMessage mockMessage = new EnqueableMessage()
+ {
+
+ public long getMessageNumber()
+ {
+ return messageId;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ };
+
+ MessageStore log = (MessageStore) store;
+ MessageStore.Transaction txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, mockMessage);
txn.commitTran();
}
finally
Propchange: qpid/trunk/qpid/java/broker-plugins/access-control/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/access-control/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/experimental/info/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/experimental/info/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/experimental/shutdown/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/extras/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/extras/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/firewall/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker-plugins/firewall/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/trunk/qpid/java/broker/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java Wed Dec 28 13:02:41 2011
@@ -111,6 +111,11 @@ public class ManagementExchange implemen
}
+ public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
+ {
+ enqueue(message);
+ }
+
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
enqueue(message);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java Wed Dec 28 13:02:41 2011
@@ -32,6 +32,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.management.common.mbeans.ManagedConnection;
import java.util.ArrayList;
+import java.util.List;
public class QMFBrokerRequestCommand extends QMFCommand
{
@@ -57,7 +58,7 @@ public class QMFBrokerRequestCommand ext
QMFMessage responseMessage = new QMFMessage(queueName, cmd);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java Wed Dec 28 13:02:41 2011
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
public class QMFClassQueryCommand extends QMFCommand
{
@@ -71,7 +72,7 @@ public class QMFClassQueryCommand extend
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java Wed Dec 28 13:02:41 2011
@@ -163,7 +163,7 @@ public class QMFGetQueryCommand extends
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java Wed Dec 28 13:02:41 2011
@@ -21,8 +21,11 @@
package org.apache.qpid.qmf;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.message.*;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.transport.codec.BBEncoder;
import java.nio.ByteBuffer;
@@ -59,11 +62,21 @@ public class QMFMessage implements Serve
return _routingKey;
}
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(_routingKey);
+ }
+
public AMQMessageHeader getMessageHeader()
{
return this;
}
+ public StoredMessage getStoredMessage()
+ {
+ throw new NotImplementedException();
+ }
+
public boolean isPersistent()
{
return false;
@@ -159,9 +172,9 @@ public class QMFMessage implements Serve
return new QMFMessageReference(this);
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
- return null;
+ return 0l;
}
public long getArrivalTime()
@@ -172,9 +185,9 @@ public class QMFMessage implements Serve
public int getContent(ByteBuffer buf, int offset)
{
ByteBuffer src = _content.duplicate();
- _content.position(offset);
- _content = _content.slice();
- int len = _content.remaining();
+ src.position(offset);
+ src = src.slice();
+ int len = src.remaining();
if(len > buf.remaining())
{
len = buf.remaining();
@@ -185,6 +198,16 @@ public class QMFMessage implements Serve
return len;
}
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ ByteBuffer src = _content.duplicate();
+ src.position(offset);
+ src = src.slice();
+ src.limit(size);
+ return src;
+ }
+
private static class QMFMessageReference extends MessageReference<QMFMessage>
{
public QMFMessageReference(QMFMessage message)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java Wed Dec 28 13:02:41 2011
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.AMQException;
+import java.util.List;
import java.util.UUID;
import java.util.ArrayList;
@@ -68,7 +69,7 @@ public class QMFMethodRequestCommand ext
QMFMessage responseMessage = new QMFMessage(queueName, cmd);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java Wed Dec 28 13:02:41 2011
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
public class QMFPackageQueryCommand extends QMFCommand
{
@@ -67,7 +68,7 @@ public class QMFPackageQueryCommand exte
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java Wed Dec 28 13:02:41 2011
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import java.util.Collection;
import java.util.ArrayList;
+import java.util.List;
public class QMFSchemaRequestCommand extends QMFCommand
{
@@ -70,7 +71,7 @@ public class QMFSchemaRequestCommand ext
QMFMessage responseMessage = new QMFMessage(routingKey, cmd);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Dec 28 13:02:41 2011
@@ -95,7 +95,7 @@ import java.util.concurrent.atomic.Atomi
public class AMQChannel implements SessionConfig, AMQSessionModel
{
- public static final int DEFAULT_PREFETCH = 5000;
+ public static final int DEFAULT_PREFETCH = 4096;
private static final Logger _logger = Logger.getLogger(AMQChannel.class);
@@ -166,6 +166,8 @@ public class AMQChannel implements Sessi
private final UUID _id;
private long _createTime = System.currentTimeMillis();
+ private final ClientDeliveryMethod _clientDeliveryMethod;
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -183,6 +185,8 @@ public class AMQChannel implements Sessi
// by default the session is non-transactional
_transaction = new AutoCommitTransaction(_messageStore);
+
+ _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
}
public ConfigStore getConfigStore()
@@ -205,6 +209,11 @@ public class AMQChannel implements Sessi
return !(_transaction instanceof AutoCommitTransaction);
}
+ public void receivedComplete()
+ {
+ }
+
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
@@ -284,7 +293,7 @@ public class AMQChannel implements Sessi
_currentMessage.setExpiration();
- MessageMetaData mmd = _currentMessage.headersReceived();
+ MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd);
_currentMessage.setStoredMessage(handle);
@@ -316,8 +325,7 @@ public class AMQChannel implements Sessi
{
try
{
- _currentMessage.getStoredMessage().flushToStore();
- final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
+ final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
{
@@ -339,11 +347,13 @@ public class AMQChannel implements Sessi
}
else
{
- _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional()));
+ _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
incrementOutstandingTxnsIfNecessary();
updateTransactionalActivity();
}
}
+ _currentMessage.getStoredMessage().flushToStore();
+
}
finally
{
@@ -857,10 +867,8 @@ public class AMQChannel implements Sessi
private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
{
- Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
- _unacknowledgedMessageMap.collect(deliveryTag, multiple, ackedMessageMap);
- _unacknowledgedMessageMap.remove(ackedMessageMap);
- return ackedMessageMap.values();
+ return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
+
}
/**
@@ -949,12 +957,17 @@ public class AMQChannel implements Sessi
public void commit() throws AMQException
{
+ commit(null);
+ }
+ public void commit(Runnable immediateAction) throws AMQException
+ {
+
if (!isTransactional())
{
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _transaction.commit();
+ _transaction.commit(immediateAction);
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
@@ -1033,7 +1046,7 @@ public class AMQChannel implements Sessi
{
if (isTransactional())
{
- _txnUpdateTime.set(System.currentTimeMillis());
+ _txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
}
}
@@ -1079,20 +1092,6 @@ public class AMQChannel implements Sessi
return _messageStore;
}
- private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
- {
-
- public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
- throws AMQException
- {
- _session.registerMessageDelivered(entry.getMessage().getSize());
- getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
- deliveryTag, sub.getConsumerTag());
- entry.incrementDeliveryCount();
- }
-
- };
-
public ClientDeliveryMethod getClientDeliveryMethod()
{
return _clientDeliveryMethod;
@@ -1158,11 +1157,10 @@ public class AMQChannel implements Sessi
private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
- private ArrayList<? extends BaseQueue> _destinationQueues;
+ private List<? extends BaseQueue> _destinationQueues;
public MessageDeliveryAction(IncomingMessage currentMessage,
- ArrayList<? extends BaseQueue> destinationQueues,
- boolean transactional)
+ List<? extends BaseQueue> destinationQueues)
{
_incommingMessage = currentMessage;
_destinationQueues = destinationQueues;
@@ -1177,8 +1175,10 @@ public class AMQChannel implements Sessi
final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
MessageReference ref = amqMessage.newReference();
- for(final BaseQueue queue : _destinationQueues)
+ for(int i = 0; i < _destinationQueues.size(); i++)
{
+ BaseQueue queue = _destinationQueues.get(i);
+
BaseQueue.PostEnqueueAction action;
if(immediate)
@@ -1190,7 +1190,7 @@ public class AMQChannel implements Sessi
action = null;
}
- queue.enqueue(amqMessage, action);
+ queue.enqueue(amqMessage, isTransactional(), action);
if(queue instanceof AMQQueue)
{
@@ -1198,6 +1198,8 @@ public class AMQChannel implements Sessi
}
}
+
+ _incommingMessage.getStoredMessage().flushToStore();
ref.release();
}
catch (AMQException e)
@@ -1539,7 +1541,7 @@ public class AMQChannel implements Sessi
final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
- final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m);
+ final List<? extends BaseQueue> destinationQueues = altExchange.route(m);
if (destinationQueues == null || destinationQueues.isEmpty())
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java Wed Dec 28 13:02:41 2011
@@ -22,8 +22,8 @@ package org.apache.qpid.server;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.AMQException;
@@ -39,13 +39,13 @@ public class ExtractResendAndRequeue imp
private final Map<Long, QueueEntry> _msgToResend;
private final boolean _requeueIfUnabletoResend;
private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
- private final TransactionLog _transactionLog;
+ private final MessageStore _transactionLog;
public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
Map<Long, QueueEntry> msgToRequeue,
Map<Long, QueueEntry> msgToResend,
boolean requeueIfUnabletoResend,
- TransactionLog txnLog)
+ MessageStore txnLog)
{
_unacknowledgedMessageMap = unacknowledgedMessageMap;
_msgToRequeue = msgToRequeue;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Wed Dec 28 13:02:41 2011
@@ -46,10 +46,6 @@ public interface UnacknowledgedMessageMa
void add(long deliveryTag, QueueEntry message);
- void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);
-
- void remove(Map<Long,QueueEntry> msgs);
-
QueueEntry remove(long deliveryTag);
Collection<QueueEntry> cancelAllMessages();
@@ -67,6 +63,8 @@ public interface UnacknowledgedMessageMa
*/
Set<Long> getDeliveryTags();
+ Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Wed Dec 28 13:02:41 2011
@@ -157,6 +157,14 @@ public class UnacknowledgedMessageMapImp
}
}
+ public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+ {
+ Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+ collect(deliveryTag, multiple, ackedMessageMap);
+ remove(ackedMessageMap);
+ return ackedMessageMap.values();
+ }
+
private void collect(long key, Map<Long, QueueEntry> msgs)
{
synchronized (_lock)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Wed Dec 28 13:02:41 2011
@@ -115,4 +115,9 @@ public class Binding
return result;
}
+ public String toString()
+ {
+ return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+"}";
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Dec 28 13:02:41 2011
@@ -351,11 +351,11 @@ public abstract class AbstractExchange i
- public final ArrayList<? extends BaseQueue> route(final InboundMessage message)
+ public final List<? extends BaseQueue> route(final InboundMessage message)
{
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
- final ArrayList<? extends BaseQueue> queues = doRoute(message);
+ final List<? extends BaseQueue> queues = doRoute(message);
if(!queues.isEmpty())
{
_routedMessageCount.incrementAndGet();
@@ -364,7 +364,7 @@ public abstract class AbstractExchange i
return queues;
}
- protected abstract ArrayList<? extends BaseQueue> doRoute(final InboundMessage message);
+ protected abstract List<? extends BaseQueue> doRoute(final InboundMessage message);
public long getMsgReceives()
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org